Merge pull request #4191 from thingsboard/master

Merge master to develop 3.3
This commit is contained in:
Andrew Shvayka 2021-03-03 15:57:51 +02:00 committed by GitHub
commit a3cb9724e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
198 changed files with 2859 additions and 1049 deletions

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>application</artifactId>

View File

@ -47,7 +47,7 @@
"resources": [],
"templateHtml": "<tb-timeseries-table-widget \n [ctx]=\"ctx\">\n</tb-timeseries-table-widget>",
"templateCss": "",
"controllerScript": "self.onInit = function() {\n}\n\nself.onDataUpdated = function() {\n self.ctx.$scope.timeseriesTableWidget.onDataUpdated();\n}\n\nself.actionSources = function() {\n return {\n 'actionCellButton': {\n name: 'widget-action.action-cell-button',\n multiple: true\n },\n 'rowClick': {\n name: 'widget-action.row-click',\n multiple: false\n }\n };\n}\n\nself.onDestroy = function() {\n}",
"controllerScript": "self.onInit = function() {\n}\n\nself.onDataUpdated = function() {\n self.ctx.$scope.timeseriesTableWidget.onDataUpdated();\n}\n\nself.typeParameters = function() {\n return {\n ignoreDataUpdateOnIntervalTick: true\n };\n}\n\nself.actionSources = function() {\n return {\n 'actionCellButton': {\n name: 'widget-action.action-cell-button',\n multiple: true\n },\n 'rowClick': {\n name: 'widget-action.row-click',\n multiple: false\n }\n };\n}\n\nself.onDestroy = function() {\n}",
"settingsSchema": "{\n \"schema\": {\n \"type\": \"object\",\n \"title\": \"TimeseriesTableSettings\",\n \"properties\": {\n \"showTimestamp\": {\n \"title\": \"Display timestamp column\",\n \"type\": \"boolean\",\n \"default\": true\n },\n \"showMilliseconds\": {\n \"title\": \"Display timestamp milliseconds\",\n \"type\": \"boolean\",\n \"default\": false\n },\n \"displayPagination\": {\n \"title\": \"Display pagination\",\n \"type\": \"boolean\",\n \"default\": true\n }, \n \"defaultPageSize\": {\n \"title\": \"Default page size\",\n \"type\": \"number\",\n \"default\": 10\n },\n \"hideEmptyLines\": {\n \"title\": \"Hide empty lines\",\n \"type\": \"boolean\",\n \"default\": false\n }\n },\n \"required\": []\n },\n \"form\": [\n \"showTimestamp\",\n \"showMilliseconds\",\n \"displayPagination\",\n \"defaultPageSize\",\n \"hideEmptyLines\"\n ]\n}",
"dataKeySettingsSchema": "{\n \"schema\": {\n \"type\": \"object\",\n \"title\": \"DataKeySettings\",\n \"properties\": {\n \"useCellStyleFunction\": {\n \"title\": \"Use cell style function\",\n \"type\": \"boolean\",\n \"default\": false\n },\n \"cellStyleFunction\": {\n \"title\": \"Cell style function: f(value)\",\n \"type\": \"string\",\n \"default\": \"\"\n },\n \"useCellContentFunction\": {\n \"title\": \"Use cell content function\",\n \"type\": \"boolean\",\n \"default\": false\n },\n \"cellContentFunction\": {\n \"title\": \"Cell content function: f(value, rowData, ctx)\",\n \"type\": \"string\",\n \"default\": \"\"\n }\n },\n \"required\": []\n },\n \"form\": [\n \"useCellStyleFunction\",\n {\n \"key\": \"cellStyleFunction\",\n \"type\": \"javascript\"\n },\n \"useCellContentFunction\",\n {\n \"key\": \"cellContentFunction\",\n \"type\": \"javascript\"\n }\n ]\n}",
"defaultConfig": "{\"datasources\":[{\"type\":\"function\",\"name\":\"function\",\"dataKeys\":[{\"name\":\"f(x)\",\"type\":\"function\",\"label\":\"Temperature °C\",\"color\":\"#2196f3\",\"settings\":{\"useCellStyleFunction\":true,\"cellStyleFunction\":\"if (value) {\\n var percent = (value + 60)/120 * 100;\\n var color = tinycolor.mix('blue', 'red', amount = percent);\\n color.setAlpha(.5);\\n return {\\n paddingLeft: '20px',\\n color: '#ffffff',\\n background: color.toRgbString(),\\n fontSize: '18px'\\n };\\n} else {\\n return {};\\n}\"},\"_hash\":0.8587686344902596,\"funcBody\":\"var value = prevValue + Math.random() * 40 - 20;\\nvar multiplier = Math.pow(10, 1 || 0);\\nvar value = Math.round(value * multiplier) / multiplier;\\nif (value < -60) {\\n\\tvalue = -60;\\n} else if (value > 60) {\\n\\tvalue = 60;\\n}\\nreturn value;\"},{\"name\":\"f(x)\",\"type\":\"function\",\"label\":\"Humidity, %\",\"color\":\"#ffc107\",\"settings\":{\"useCellStyleFunction\":true,\"cellStyleFunction\":\"if (value) {\\n var percent = value;\\n var backgroundColor = tinycolor('blue');\\n backgroundColor.setAlpha(value/100);\\n var color = 'blue';\\n if (value > 50) {\\n color = 'white';\\n }\\n \\n return {\\n paddingLeft: '20px',\\n color: color,\\n background: backgroundColor.toRgbString(),\\n fontSize: '18px'\\n };\\n} else {\\n return {};\\n}\",\"useCellContentFunction\":false},\"_hash\":0.12775350966079668,\"funcBody\":\"var value = prevValue + Math.random() * 20 - 10;\\nvar multiplier = Math.pow(10, 1 || 0);\\nvar value = Math.round(value * multiplier) / multiplier;\\nif (value < 5) {\\n\\tvalue = 5;\\n} else if (value > 100) {\\n\\tvalue = 100;\\n}\\nreturn value;\"}]}],\"timewindow\":{\"realtime\":{\"interval\":1000,\"timewindowMs\":60000},\"aggregation\":{\"type\":\"NONE\",\"limit\":200}},\"showTitle\":true,\"backgroundColor\":\"rgb(255, 255, 255)\",\"color\":\"rgba(0, 0, 0, 0.87)\",\"padding\":\"8px\",\"settings\":{\"showTimestamp\":true,\"displayPagination\":true,\"defaultPageSize\":10},\"title\":\"Timeseries table\",\"dropShadow\":true,\"enableFullscreen\":true,\"titleStyle\":{\"fontSize\":\"16px\",\"fontWeight\":400,\"padding\":\"5px 10px 5px 10px\"},\"useDashboardTimewindow\":false,\"showLegend\":false,\"widgetStyle\":{},\"actions\":{},\"showTitleIcon\":false,\"iconColor\":\"rgba(0, 0, 0, 0.87)\",\"iconSize\":\"24px\"}"
@ -134,4 +134,4 @@
}
}
]
}
}

View File

@ -84,11 +84,12 @@ BEGIN
END IF;
END IF;
END IF;
END IF;
IF partition_to_delete IS NOT NULL THEN
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
EXECUTE format('DROP TABLE %I', partition_to_delete);
deleted := deleted + 1;
IF partition_to_delete IS NOT NULL THEN
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
EXECUTE format('DROP TABLE IF EXISTS %I', partition_to_delete);
partition_to_delete := NULL;
deleted := deleted + 1;
END IF;
END IF;
END LOOP;
END IF;

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
@ -278,7 +279,21 @@ class DefaultTbContext implements TbContext {
}
public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED);
RuleChainId ruleChainId = null;
String queueName = ServiceQueue.MAIN;
if (device.getDeviceProfileId() != null) {
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().find(device.getDeviceProfileId());
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", device.getDeviceProfileId());
ruleChainId = null;
queueName = ServiceQueue.MAIN;
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
}
}
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueName, ruleChainId);
}
public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
@ -286,12 +301,31 @@ class DefaultTbContext implements TbContext {
}
public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action);
RuleChainId ruleChainId = null;
String queueName = ServiceQueue.MAIN;
if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) {
DeviceId deviceId = new DeviceId(alarm.getOriginator().getId());
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", deviceId);
ruleChainId = null;
queueName = ServiceQueue.MAIN;
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;
}
}
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId);
}
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) {
return entityActionMsg(entity, id, ruleNodeId, action, ServiceQueue.MAIN, null);
}
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) {
try {
return TbMsg.newMsg(action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)));
return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
}

View File

@ -215,6 +215,7 @@ public class AuthController extends BaseController {
User user = userService.findUserById(TenantId.SYS_TENANT_ID, credentials.getUserId());
UserPrincipal principal = new UserPrincipal(UserPrincipal.Type.USER_NAME, user.getEmail());
SecurityUser securityUser = new SecurityUser(user, credentials.isEnabled(), principal);
userService.setUserCredentialsEnabled(user.getTenantId(), user.getId(), true);
String baseUrl = systemSecurityService.getBaseUrl(user.getTenantId(), user.getCustomerId(), request);
String loginUrl = String.format("%s/login", baseUrl);
String email = user.getEmail();

View File

@ -94,12 +94,24 @@ public class UserController extends BaseController {
processDashboardIdFromAdditionalInfo((ObjectNode) user.getAdditionalInfo(), DEFAULT_DASHBOARD);
processDashboardIdFromAdditionalInfo((ObjectNode) user.getAdditionalInfo(), HOME_DASHBOARD);
}
UserCredentials userCredentials = userService.findUserCredentialsByUserId(user.getTenantId(), user.getId());
if(userCredentials.isEnabled()) {
addUserCredentialsEnabled((ObjectNode) user.getAdditionalInfo());
}
return user;
} catch (Exception e) {
throw handleException(e);
}
}
private void addUserCredentialsEnabled(ObjectNode additionalInfo) {
if(!additionalInfo.isNull()) {
if(!additionalInfo.has("userCredentialsEnabled")) {
additionalInfo.put("userCredentialsEnabled", true);
}
}
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
@RequestMapping(value = "/user/tokenAccessEnabled", method = RequestMethod.GET)
@ResponseBody
@ -193,13 +205,13 @@ public class UserController extends BaseController {
user.getId(), user);
UserCredentials userCredentials = userService.findUserCredentialsByUserId(getCurrentUser().getTenantId(), user.getId());
if (!userCredentials.isEnabled()) {
if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) {
String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request);
String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl,
userCredentials.getActivateToken());
mailService.sendActivationEmail(activateUrl, email);
} else {
throw new ThingsboardException("User is already active!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
} catch (Exception e) {
throw handleException(e);
@ -218,13 +230,13 @@ public class UserController extends BaseController {
User user = checkUserId(userId, Operation.READ);
SecurityUser authUser = getCurrentUser();
UserCredentials userCredentials = userService.findUserCredentialsByUserId(authUser.getTenantId(), user.getId());
if (!userCredentials.isEnabled()) {
if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) {
String baseUrl = systemSecurityService.getBaseUrl(getTenantId(), getCurrentUser().getCustomerId(), request);
String activateUrl = String.format(ACTIVATE_URL_PATTERN, baseUrl,
userCredentials.getActivateToken());
return activateUrl;
} else {
throw new ThingsboardException("User is already active!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
} catch (Exception e) {
throw handleException(e);

View File

@ -186,7 +186,11 @@ public class ThingsboardInstallService {
log.info("Upgrading ThingsBoard from version 3.2.0 to 3.2.1 ...");
databaseEntitiesUpgradeService.upgradeDatabase("3.2.0");
case "3.2.1":
log.info("Upgrading ThingsBoard from version 3.2.1 to 3.3.0 ...");
log.info("Upgrading ThingsBoard from version 3.2.1 to 3.2.2 ...");
if (databaseTsUpgradeService != null) {
databaseTsUpgradeService.upgradeDatabase("3.2.1");
}
log.info("Updating system data...");
systemDataLoaderService.updateSystemWidgets();
break;

View File

@ -50,6 +50,7 @@ public class CassandraTsDatabaseUpgradeService extends AbstractCassandraDatabase
break;
case "2.5.0":
case "3.1.1":
case "3.2.1":
break;
default:
throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion);

View File

@ -36,6 +36,9 @@ import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmConditionFilter;
import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey;
import org.thingsboard.server.common.data.device.profile.AlarmConditionKeyType;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration;
import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration;
@ -290,16 +293,16 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
AlarmCondition temperatureCondition = new AlarmCondition();
temperatureCondition.setSpec(new SimpleAlarmConditionSpec());
KeyFilter temperatureAlarmFlagAttributeFilter = new KeyFilter();
temperatureAlarmFlagAttributeFilter.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "temperatureAlarmFlag"));
AlarmConditionFilter temperatureAlarmFlagAttributeFilter = new AlarmConditionFilter();
temperatureAlarmFlagAttributeFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, "temperatureAlarmFlag"));
temperatureAlarmFlagAttributeFilter.setValueType(EntityKeyValueType.BOOLEAN);
BooleanFilterPredicate temperatureAlarmFlagAttributePredicate = new BooleanFilterPredicate();
temperatureAlarmFlagAttributePredicate.setOperation(BooleanFilterPredicate.BooleanOperation.EQUAL);
temperatureAlarmFlagAttributePredicate.setValue(new FilterPredicateValue<>(Boolean.TRUE));
temperatureAlarmFlagAttributeFilter.setPredicate(temperatureAlarmFlagAttributePredicate);
KeyFilter temperatureTimeseriesFilter = new KeyFilter();
temperatureTimeseriesFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
AlarmConditionFilter temperatureTimeseriesFilter = new AlarmConditionFilter();
temperatureTimeseriesFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
temperatureTimeseriesFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate temperatureTimeseriesFilterPredicate = new NumericFilterPredicate();
temperatureTimeseriesFilterPredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
@ -317,8 +320,8 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
AlarmCondition clearTemperatureCondition = new AlarmCondition();
clearTemperatureCondition.setSpec(new SimpleAlarmConditionSpec());
KeyFilter clearTemperatureTimeseriesFilter = new KeyFilter();
clearTemperatureTimeseriesFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
AlarmConditionFilter clearTemperatureTimeseriesFilter = new AlarmConditionFilter();
clearTemperatureTimeseriesFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "temperature"));
clearTemperatureTimeseriesFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate clearTemperatureTimeseriesFilterPredicate = new NumericFilterPredicate();
clearTemperatureTimeseriesFilterPredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS_OR_EQUAL);
@ -340,16 +343,16 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
AlarmCondition humidityCondition = new AlarmCondition();
humidityCondition.setSpec(new SimpleAlarmConditionSpec());
KeyFilter humidityAlarmFlagAttributeFilter = new KeyFilter();
humidityAlarmFlagAttributeFilter.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "humidityAlarmFlag"));
AlarmConditionFilter humidityAlarmFlagAttributeFilter = new AlarmConditionFilter();
humidityAlarmFlagAttributeFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.ATTRIBUTE, "humidityAlarmFlag"));
humidityAlarmFlagAttributeFilter.setValueType(EntityKeyValueType.BOOLEAN);
BooleanFilterPredicate humidityAlarmFlagAttributePredicate = new BooleanFilterPredicate();
humidityAlarmFlagAttributePredicate.setOperation(BooleanFilterPredicate.BooleanOperation.EQUAL);
humidityAlarmFlagAttributePredicate.setValue(new FilterPredicateValue<>(Boolean.TRUE));
humidityAlarmFlagAttributeFilter.setPredicate(humidityAlarmFlagAttributePredicate);
KeyFilter humidityTimeseriesFilter = new KeyFilter();
humidityTimeseriesFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "humidity"));
AlarmConditionFilter humidityTimeseriesFilter = new AlarmConditionFilter();
humidityTimeseriesFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "humidity"));
humidityTimeseriesFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate humidityTimeseriesFilterPredicate = new NumericFilterPredicate();
humidityTimeseriesFilterPredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
@ -368,8 +371,8 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
AlarmCondition clearHumidityCondition = new AlarmCondition();
clearHumidityCondition.setSpec(new SimpleAlarmConditionSpec());
KeyFilter clearHumidityTimeseriesFilter = new KeyFilter();
clearHumidityTimeseriesFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "humidity"));
AlarmConditionFilter clearHumidityTimeseriesFilter = new AlarmConditionFilter();
clearHumidityTimeseriesFilter.setKey(new AlarmConditionFilterKey(AlarmConditionKeyType.TIME_SERIES, "humidity"));
clearHumidityTimeseriesFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate clearHumidityTimeseriesFilterPredicate = new NumericFilterPredicate();
clearHumidityTimeseriesFilterPredicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER_OR_EQUAL);

View File

@ -196,11 +196,17 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe
}
break;
case "3.1.1":
case "3.2.1":
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) {
log.info("Load TTL functions ...");
loadSql(conn, LOAD_TTL_FUNCTIONS_SQL);
log.info("Load Drop Partitions functions ...");
loadSql(conn, LOAD_DROP_PARTITIONS_FUNCTIONS_SQL);
executeQuery(conn, "DROP PROCEDURE IF EXISTS cleanup_timeseries_by_ttl(character varying, bigint, bigint);");
executeQuery(conn, "DROP FUNCTION IF EXISTS delete_asset_records_from_ts_kv(character varying, character varying, bigint);");
executeQuery(conn, "DROP FUNCTION IF EXISTS delete_device_records_from_ts_kv(character varying, character varying, bigint);");
executeQuery(conn, "DROP FUNCTION IF EXISTS delete_customer_records_from_ts_kv(character varying, character varying, bigint);");
}
break;
default:

View File

@ -178,6 +178,7 @@ public class TimescaleTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgr
}
break;
case "3.1.1":
case "3.2.1":
break;
default:
throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion);

View File

@ -145,7 +145,7 @@ public class DefaultTbClusterService implements TbClusterService {
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
}
}
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())

View File

@ -55,31 +55,22 @@ public class TbCoreConsumerStats {
public TbCoreConsumerStats(StatsFactory statsFactory) {
String statsKey = StatsType.CORE.getName();
this.totalCounter = statsFactory.createStatsCounter(statsKey, TOTAL_MSGS);
this.sessionEventCounter = statsFactory.createStatsCounter(statsKey, SESSION_EVENTS);
this.getAttributesCounter = statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE);
this.subscribeToAttributesCounter = statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES);
this.subscribeToRPCCounter = statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES);
this.toDeviceRPCCallResponseCounter = statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES);
this.subscriptionInfoCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO);
this.claimDeviceCounter = statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS);
this.deviceStateCounter = statsFactory.createStatsCounter(statsKey, DEVICE_STATES);
this.subscriptionMsgCounter = statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS);
this.toCoreNotificationsCounter = statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS);
this.totalCounter = register(statsFactory.createStatsCounter(statsKey, TOTAL_MSGS));
this.sessionEventCounter = register(statsFactory.createStatsCounter(statsKey, SESSION_EVENTS));
this.getAttributesCounter = register(statsFactory.createStatsCounter(statsKey, GET_ATTRIBUTE));
this.subscribeToAttributesCounter = register(statsFactory.createStatsCounter(statsKey, ATTRIBUTE_SUBSCRIBES));
this.subscribeToRPCCounter = register(statsFactory.createStatsCounter(statsKey, RPC_SUBSCRIBES));
this.toDeviceRPCCallResponseCounter = register(statsFactory.createStatsCounter(statsKey, TO_DEVICE_RPC_CALL_RESPONSES));
this.subscriptionInfoCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_INFO));
this.claimDeviceCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_CLAIMS));
this.deviceStateCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_STATES));
this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS));
this.toCoreNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS));
}
counters.add(totalCounter);
counters.add(sessionEventCounter);
counters.add(getAttributesCounter);
counters.add(subscribeToAttributesCounter);
counters.add(subscribeToRPCCounter);
counters.add(toDeviceRPCCallResponseCounter);
counters.add(subscriptionInfoCounter);
counters.add(claimDeviceCounter);
counters.add(deviceStateCounter);
counters.add(subscriptionMsgCounter);
counters.add(toCoreNotificationsCounter);
private StatsCounter register(StatsCounter counter){
counters.add(counter);
return counter;
}
public void log(TransportProtos.TransportToDeviceActorMsg msg) {

View File

@ -24,7 +24,7 @@ import java.util.regex.Pattern;
@Slf4j
public abstract class AbstractSmsSender implements SmsSender {
private static final Pattern E_164_PHONE_NUMBER_PATTERN = Pattern.compile("^\\+[1-9]\\d{1,14}$");
protected static final Pattern E_164_PHONE_NUMBER_PATTERN = Pattern.compile("^\\+[1-9]\\d{1,14}$");
private static final int MAX_SMS_MESSAGE_LENGTH = 1600;
private static final int MAX_SMS_SEGMENT_LENGTH = 70;

View File

@ -19,21 +19,34 @@ import com.twilio.http.TwilioRestClient;
import com.twilio.rest.api.v2010.account.Message;
import com.twilio.type.PhoneNumber;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.rule.engine.api.sms.exception.SmsParseException;
import org.thingsboard.server.common.data.sms.config.TwilioSmsProviderConfiguration;
import org.thingsboard.rule.engine.api.sms.exception.SmsException;
import org.thingsboard.rule.engine.api.sms.exception.SmsSendException;
import org.thingsboard.server.service.sms.AbstractSmsSender;
import java.util.regex.Pattern;
public class TwilioSmsSender extends AbstractSmsSender {
private static final Pattern PHONE_NUMBERS_SID_MESSAGE_SERVICE_SID = Pattern.compile("^(PN|MG).*$");
private TwilioRestClient twilioRestClient;
private String numberFrom;
private String validatePhoneTwilioNumber(String phoneNumber) throws SmsParseException {
phoneNumber = phoneNumber.trim();
if (!E_164_PHONE_NUMBER_PATTERN.matcher(phoneNumber).matches() && !PHONE_NUMBERS_SID_MESSAGE_SERVICE_SID.matcher(phoneNumber).matches()) {
throw new SmsParseException("Invalid phone number format. Phone number must be in E.164 format/Phone Number's SID/Messaging Service SID.");
}
return phoneNumber;
}
public TwilioSmsSender(TwilioSmsProviderConfiguration config) {
if (StringUtils.isEmpty(config.getAccountSid()) || StringUtils.isEmpty(config.getAccountToken()) || StringUtils.isEmpty(config.getNumberFrom())) {
throw new IllegalArgumentException("Invalid twilio sms provider configuration: accountSid, accountToken and numberFrom should be specified!");
}
this.numberFrom = this.validatePhoneNumber(config.getNumberFrom());
this.numberFrom = this.validatePhoneTwilioNumber(config.getNumberFrom());
this.twilioRestClient = new TwilioRestClient.Builder(config.getAccountSid(), config.getAccountToken()).build();
}

View File

@ -54,6 +54,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
@ -92,7 +93,7 @@ import java.util.stream.Collectors;
public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService {
private static final int DEFAULT_LIMIT = 100;
private final Map<String, Map<Integer, TbAbstractDataSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<>();
private final Map<String, Map<Integer, TbAbstractSubCtx>> subscriptionsBySessionId = new ConcurrentHashMap<>();
@Autowired
private TelemetryWebSocketService wsService;
@ -202,7 +203,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
//TODO: validate number of dynamic page links against rate limits. Ignore dynamic flag if limit is reached.
TbEntityDataSubCtx finalCtx = ctx;
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
() -> refreshDynamicQuery(tenantId, customerId, finalCtx),
() -> refreshDynamicQuery(finalCtx),
dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
finalCtx.setRefreshTask(task);
}
@ -235,6 +236,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}, wsCallBackExecutor);
}
@Override
public void handleCmd(TelemetryWebSocketSessionRef session, EntityCountCmd cmd) {
TbEntityCountSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
if (ctx == null) {
ctx = createSubCtx(session, cmd);
long start = System.currentTimeMillis();
ctx.fetchData();
long end = System.currentTimeMillis();
stats.getRegularQueryInvocationCnt().incrementAndGet();
stats.getRegularQueryTimeSpent().addAndGet(end - start);
TbEntityCountSubCtx finalCtx = ctx;
ScheduledFuture<?> task = scheduler.scheduleWithFixedDelay(
() -> refreshDynamicQuery(finalCtx),
dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS);
finalCtx.setRefreshTask(task);
} else {
log.debug("[{}][{}] Received duplicate command: {}", session.getSessionId(), cmd.getCmdId(), cmd);
}
}
@Override
public void handleCmd(TelemetryWebSocketSessionRef session, AlarmDataCmd cmd) {
TbAlarmDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
@ -267,7 +288,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
}
private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) {
private void refreshDynamicQuery(TbAbstractSubCtx finalCtx) {
try {
long start = System.currentTimeMillis();
finalCtx.update();
@ -299,7 +320,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) {
Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, entityService, localSubscriptionService,
attributesService, stats, sessionRef, cmd.getCmdId(), maxEntitiesPerDataSubscription);
if (cmd.getQuery() != null) {
@ -309,8 +330,20 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
return ctx;
}
private TbEntityCountSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
TbEntityCountSubCtx ctx = new TbEntityCountSubCtx(serviceId, wsService, entityService, localSubscriptionService,
attributesService, stats, sessionRef, cmd.getCmdId());
if (cmd.getQuery() != null) {
ctx.setAndResolveQuery(cmd.getQuery());
}
sessionSubs.put(cmd.getCmdId(), ctx);
return ctx;
}
private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>());
TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService,
attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription);
ctx.setAndResolveQuery(cmd.getQuery());
@ -319,8 +352,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
@SuppressWarnings("unchecked")
private <T extends TbAbstractDataSubCtx> T getSubCtx(String sessionId, int cmdId) {
Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.get(sessionId);
private <T extends TbAbstractSubCtx> T getSubCtx(String sessionId, int cmdId) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.get(sessionId);
if (sessionSubs != null) {
return (T) sessionSubs.get(cmdId);
} else {
@ -464,17 +497,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId()));
}
private void cleanupAndCancel(TbAbstractDataSubCtx ctx) {
private void cleanupAndCancel(TbAbstractSubCtx ctx) {
if (ctx != null) {
ctx.cancelTasks();
ctx.clearEntitySubscriptions();
ctx.clearDynamicValueSubscriptions();
ctx.clearSubscriptions();
}
}
@Override
public void cancelAllSessionSubscriptions(String sessionId) {
Map<Integer, TbAbstractDataSubCtx> sessionSubs = subscriptionsBySessionId.remove(sessionId);
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.remove(sessionId);
if (sessionSubs != null) {
sessionSubs.values().forEach(this::cleanupAndCancel);
}

View File

@ -15,32 +15,16 @@
*/
package org.thingsboard.server.service.subscription;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AbstractDataQuery;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.FilterPredicateType;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
@ -52,140 +36,25 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Data
public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends EntityDataPageLink>> {
public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends EntityDataPageLink>> extends TbAbstractSubCtx<T> {
protected final String serviceId;
protected final SubscriptionServiceStatistics stats;
protected final TelemetryWebSocketService wsService;
protected final EntityService entityService;
protected final TbLocalSubscriptionService localSubscriptionService;
protected final AttributesService attributesService;
protected final TelemetryWebSocketSessionRef sessionRef;
protected final int cmdId;
protected final Map<Integer, EntityId> subToEntityIdMap;
protected final Set<Integer> subToDynamicValueKeySet;
@Getter
protected final Map<DynamicValueKey, List<DynamicValue>> dynamicValues;
@Getter
protected PageData<EntityData> data;
@Getter
@Setter
protected T query;
@Setter
protected volatile ScheduledFuture<?> refreshTask;
public TbAbstractDataSubCtx(String serviceId, TelemetryWebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats,
TelemetryWebSocketSessionRef sessionRef, int cmdId) {
this.serviceId = serviceId;
this.wsService = wsService;
this.entityService = entityService;
this.localSubscriptionService = localSubscriptionService;
this.attributesService = attributesService;
this.stats = stats;
this.sessionRef = sessionRef;
this.cmdId = cmdId;
super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
this.subToEntityIdMap = new ConcurrentHashMap<>();
this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet();
this.dynamicValues = new ConcurrentHashMap<>();
}
public void setAndResolveQuery(T query) {
dynamicValues.clear();
this.query = query;
if (query != null && query.getKeyFilters() != null) {
for (KeyFilter filter : query.getKeyFilters()) {
registerDynamicValues(filter.getPredicate());
}
}
resolve(getTenantId(), getCustomerId(), getUserId());
}
public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) {
List<ListenableFuture<DynamicValueKeySub>> futures = new ArrayList<>();
for (DynamicValueKey key : dynamicValues.keySet()) {
switch (key.getSourceType()) {
case CURRENT_TENANT:
futures.add(resolveEntityValue(tenantId, tenantId, key));
break;
case CURRENT_CUSTOMER:
if (customerId != null && !customerId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, customerId, key));
}
break;
case CURRENT_USER:
if (userId != null && !userId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, userId, key));
}
break;
}
}
try {
Map<EntityId, Map<String, DynamicValueKeySub>> tmpSubMap = new HashMap<>();
for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) {
tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub);
}
for (EntityId entityId : tmpSubMap.keySet()) {
Map<String, Long> keyStates = new HashMap<>();
Map<String, DynamicValueKeySub> dynamicValueKeySubMap = tmpSubMap.get(entityId);
dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs()));
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionRef.getSessionId())
.subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateConsumer((s, subscriptionUpdate) -> dynamicValueSubUpdate(s, subscriptionUpdate, dynamicValueKeySubMap))
.allKeys(false)
.keyStates(keyStates)
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
.build();
subToDynamicValueKeySet.add(subIdx);
localSubscriptionService.addSubscription(sub);
}
} catch (InterruptedException | ExecutionException e) {
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
}
}
private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate,
Map<String, DynamicValueKeySub> dynamicValueKeySubMap) {
Map<String, TsValue> latestUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0);
latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
});
boolean invalidateFilter = false;
for (Map.Entry<String, TsValue> entry : latestUpdate.entrySet()) {
String k = entry.getKey();
TsValue tsValue = entry.getValue();
DynamicValueKeySub sub = dynamicValueKeySubMap.get(k);
if (sub.updateValue(tsValue)) {
invalidateFilter = true;
updateDynamicValuesByKey(sub, tsValue);
}
}
if (invalidateFilter) {
update();
}
}
public void fetchData() {
@ -231,104 +100,10 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
return data.getData();
}
@Data
private static class DynamicValueKeySub {
private final DynamicValueKey key;
private final EntityId entityId;
private long lastUpdateTs;
private String lastUpdateValue;
boolean updateValue(TsValue value) {
if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) {
this.lastUpdateTs = value.getTs();
this.lastUpdateValue = value.getValue();
return true;
} else {
return false;
}
}
}
private ListenableFuture<DynamicValueKeySub> resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) {
ListenableFuture<Optional<AttributeKvEntry>> entry = attributesService.find(tenantId, entityId,
TbAttributeSubscriptionScope.SERVER_SCOPE.name(), key.getSourceAttribute());
return Futures.transform(entry, attributeOpt -> {
DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId);
if (attributeOpt.isPresent()) {
AttributeKvEntry attribute = attributeOpt.get();
sub.setLastUpdateTs(attribute.getLastUpdateTs());
sub.setLastUpdateValue(attribute.getValueAsString());
updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString()));
}
return sub;
}, MoreExecutors.directExecutor());
}
@SuppressWarnings("unchecked")
private void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) {
DynamicValueKey dvk = sub.getKey();
switch (dvk.getPredicateType()) {
case STRING:
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue()));
break;
case NUMERIC:
try {
Double dValue = Double.parseDouble(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue));
} catch (NumberFormatException e) {
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null));
}
break;
case BOOLEAN:
Boolean bValue = Boolean.parseBoolean(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue));
break;
}
}
@SuppressWarnings("unchecked")
private void registerDynamicValues(KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
case NUMERIC:
case BOOLEAN:
Optional<DynamicValue> value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate);
if (value.isPresent()) {
DynamicValue dynamicValue = value.get();
DynamicValueKey key = new DynamicValueKey(
predicate.getType(),
dynamicValue.getSourceType(),
dynamicValue.getSourceAttribute());
dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue);
}
break;
case COMPLEX:
((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues);
}
}
private Optional<DynamicValue<T>> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate<T> predicate) {
if (predicate.getValue().getUserValue() == null) {
return Optional.ofNullable(predicate.getValue().getDynamicValue());
} else {
return Optional.empty();
}
}
public String getSessionId() {
return sessionRef.getSessionId();
}
public TenantId getTenantId() {
return sessionRef.getSecurityCtx().getTenantId();
}
public CustomerId getCustomerId() {
return sessionRef.getSecurityCtx().getCustomerId();
}
public UserId getUserId() {
return sessionRef.getSecurityCtx().getId();
@Override
public void clearSubscriptions() {
clearEntitySubscriptions();
super.clearSubscriptions();
}
public void clearEntitySubscriptions() {
@ -340,26 +115,6 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
}
}
public void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
}
subToDynamicValueKeySet.clear();
}
}
public void setRefreshTask(ScheduledFuture<?> task) {
this.refreshTask = task;
}
public void cancelTasks() {
if (this.refreshTask != null) {
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId);
this.refreshTask.cancel(true);
}
}
public void createSubscriptions(List<EntityKey> keys, boolean resultToLatestValues) {
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
for (EntityData entityData : data.getData()) {
@ -459,14 +214,4 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
abstract void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues);
@Data
private static class DynamicValueKey {
@Getter
private final FilterPredicateType predicateType;
@Getter
private final DynamicValueSourceType sourceType;
@Getter
private final String sourceAttribute;
}
}

View File

@ -0,0 +1,315 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.FilterPredicateType;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
@Slf4j
@Data
public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
protected final String serviceId;
protected final SubscriptionServiceStatistics stats;
protected final TelemetryWebSocketService wsService;
protected final EntityService entityService;
protected final TbLocalSubscriptionService localSubscriptionService;
protected final AttributesService attributesService;
protected final TelemetryWebSocketSessionRef sessionRef;
protected final int cmdId;
protected final Set<Integer> subToDynamicValueKeySet;
@Getter
protected final Map<DynamicValueKey, List<DynamicValue>> dynamicValues;
@Getter
@Setter
protected T query;
@Setter
protected volatile ScheduledFuture<?> refreshTask;
public TbAbstractSubCtx(String serviceId, TelemetryWebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats,
TelemetryWebSocketSessionRef sessionRef, int cmdId) {
this.serviceId = serviceId;
this.wsService = wsService;
this.entityService = entityService;
this.localSubscriptionService = localSubscriptionService;
this.attributesService = attributesService;
this.stats = stats;
this.sessionRef = sessionRef;
this.cmdId = cmdId;
this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet();
this.dynamicValues = new ConcurrentHashMap<>();
}
public void setAndResolveQuery(T query) {
dynamicValues.clear();
this.query = query;
if (query != null && query.getKeyFilters() != null) {
for (KeyFilter filter : query.getKeyFilters()) {
registerDynamicValues(filter.getPredicate());
}
}
resolve(getTenantId(), getCustomerId(), getUserId());
}
public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) {
List<ListenableFuture<DynamicValueKeySub>> futures = new ArrayList<>();
for (DynamicValueKey key : dynamicValues.keySet()) {
switch (key.getSourceType()) {
case CURRENT_TENANT:
futures.add(resolveEntityValue(tenantId, tenantId, key));
break;
case CURRENT_CUSTOMER:
if (customerId != null && !customerId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, customerId, key));
}
break;
case CURRENT_USER:
if (userId != null && !userId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, userId, key));
}
break;
}
}
try {
Map<EntityId, Map<String, DynamicValueKeySub>> tmpSubMap = new HashMap<>();
for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) {
tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub);
}
for (EntityId entityId : tmpSubMap.keySet()) {
Map<String, Long> keyStates = new HashMap<>();
Map<String, DynamicValueKeySub> dynamicValueKeySubMap = tmpSubMap.get(entityId);
dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs()));
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionRef.getSessionId())
.subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateConsumer((s, subscriptionUpdate) -> dynamicValueSubUpdate(s, subscriptionUpdate, dynamicValueKeySubMap))
.allKeys(false)
.keyStates(keyStates)
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
.build();
subToDynamicValueKeySet.add(subIdx);
localSubscriptionService.addSubscription(sub);
}
} catch (InterruptedException | ExecutionException e) {
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
}
}
private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate,
Map<String, DynamicValueKeySub> dynamicValueKeySubMap) {
Map<String, TsValue> latestUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0);
latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
});
boolean invalidateFilter = false;
for (Map.Entry<String, TsValue> entry : latestUpdate.entrySet()) {
String k = entry.getKey();
TsValue tsValue = entry.getValue();
DynamicValueKeySub sub = dynamicValueKeySubMap.get(k);
if (sub.updateValue(tsValue)) {
invalidateFilter = true;
updateDynamicValuesByKey(sub, tsValue);
}
}
if (invalidateFilter) {
update();
}
}
public abstract void fetchData();
protected abstract void update();
public void clearSubscriptions() {
clearDynamicValueSubscriptions();
}
@Data
private static class DynamicValueKeySub {
private final DynamicValueKey key;
private final EntityId entityId;
private long lastUpdateTs;
private String lastUpdateValue;
boolean updateValue(TsValue value) {
if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) {
this.lastUpdateTs = value.getTs();
this.lastUpdateValue = value.getValue();
return true;
} else {
return false;
}
}
}
private ListenableFuture<DynamicValueKeySub> resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) {
ListenableFuture<Optional<AttributeKvEntry>> entry = attributesService.find(tenantId, entityId,
TbAttributeSubscriptionScope.SERVER_SCOPE.name(), key.getSourceAttribute());
return Futures.transform(entry, attributeOpt -> {
DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId);
if (attributeOpt.isPresent()) {
AttributeKvEntry attribute = attributeOpt.get();
sub.setLastUpdateTs(attribute.getLastUpdateTs());
sub.setLastUpdateValue(attribute.getValueAsString());
updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString()));
}
return sub;
}, MoreExecutors.directExecutor());
}
@SuppressWarnings("unchecked")
protected void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) {
DynamicValueKey dvk = sub.getKey();
switch (dvk.getPredicateType()) {
case STRING:
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue()));
break;
case NUMERIC:
try {
Double dValue = Double.parseDouble(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue));
} catch (NumberFormatException e) {
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null));
}
break;
case BOOLEAN:
Boolean bValue = Boolean.parseBoolean(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue));
break;
}
}
@SuppressWarnings("unchecked")
private void registerDynamicValues(KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
case NUMERIC:
case BOOLEAN:
Optional<DynamicValue> value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate);
if (value.isPresent()) {
DynamicValue dynamicValue = value.get();
DynamicValueKey key = new DynamicValueKey(
predicate.getType(),
dynamicValue.getSourceType(),
dynamicValue.getSourceAttribute());
dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue);
}
break;
case COMPLEX:
((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues);
}
}
private Optional<DynamicValue<T>> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate<T> predicate) {
if (predicate.getValue().getUserValue() == null) {
return Optional.ofNullable(predicate.getValue().getDynamicValue());
} else {
return Optional.empty();
}
}
public String getSessionId() {
return sessionRef.getSessionId();
}
public TenantId getTenantId() {
return sessionRef.getSecurityCtx().getTenantId();
}
public CustomerId getCustomerId() {
return sessionRef.getSecurityCtx().getCustomerId();
}
public UserId getUserId() {
return sessionRef.getSecurityCtx().getId();
}
protected void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
}
subToDynamicValueKeySet.clear();
}
}
public void setRefreshTask(ScheduledFuture<?> task) {
this.refreshTask = task;
}
public void cancelTasks() {
if (this.refreshTask != null) {
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId);
this.refreshTask.cancel(true);
}
}
@Data
public static class DynamicValueKey {
@Getter
private final FilterPredicateType predicateType;
@Getter
private final DynamicValueSourceType sourceType;
@Getter
private final String sourceAttribute;
}
}

View File

@ -90,8 +90,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
AlarmDataUpdate update;
if (!entitiesMap.isEmpty()) {
long start = System.currentTimeMillis();
PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(),
query, getOrderedEntityIds());
PageData<AlarmData> alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), query, getOrderedEntityIds());
long end = System.currentTimeMillis();
stats.getAlarmQueryInvocationCnt().incrementAndGet();
stats.getAlarmQueryTimeSpent().addAndGet(end - start);

View File

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@Slf4j
public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
private volatile int result;
public TbEntityCountSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService,
TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService,
SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId) {
super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId);
}
@Override
public void fetchData() {
result = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query);
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result));
}
@Override
protected void update() {
int newCount = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query);
if (newCount != result) {
result = newCount;
wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result));
}
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription;
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.UnsubscribeCmd;
@ -25,6 +26,8 @@ public interface TbEntityDataSubscriptionService {
void handleCmd(TelemetryWebSocketSessionRef sessionId, EntityDataCmd cmd);
void handleCmd(TelemetryWebSocketSessionRef sessionId, EntityCountCmd cmd);
void handleCmd(TelemetryWebSocketSessionRef sessionId, AlarmDataCmd cmd);
void cancelSubscription(String sessionId, UnsubscribeCmd subscriptionId);

View File

@ -51,22 +51,22 @@ import org.thingsboard.server.service.security.ValidationResult;
import org.thingsboard.server.service.security.ValidationResultCode;
import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.subscription.TbAttributeSubscription;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbAttributeSubscription;
import org.thingsboard.server.service.subscription.TbTimeseriesSubscription;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.v1.AttributesSubscriptionCmd;
import org.thingsboard.server.service.telemetry.cmd.v1.GetHistoryCmd;
import org.thingsboard.server.service.telemetry.cmd.v1.SubscriptionCmd;
import org.thingsboard.server.service.telemetry.cmd.v1.TelemetryPluginCmd;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUnsubscribeCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.UnsubscribeCmd;
import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
@ -216,12 +216,18 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
if (cmdsWrapper.getAlarmDataCmds() != null) {
cmdsWrapper.getAlarmDataCmds().forEach(cmd -> handleWsAlarmDataCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityCountCmds() != null) {
cmdsWrapper.getEntityCountCmds().forEach(cmd -> handleWsEntityCountCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityDataUnsubscribeCmds() != null) {
cmdsWrapper.getEntityDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
if (cmdsWrapper.getAlarmDataUnsubscribeCmds() != null) {
cmdsWrapper.getAlarmDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityCountUnsubscribeCmds() != null) {
cmdsWrapper.getEntityCountUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
}
} catch (IOException e) {
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
@ -239,6 +245,16 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
}
private void handleWsEntityCountCmd(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) {
String sessionId = sessionRef.getSessionId();
log.debug("[{}] Processing: {}", sessionId, cmd);
if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId)
&& validateSubscriptionCmd(sessionRef, cmd)) {
entityDataSubService.handleCmd(sessionRef, cmd);
}
}
private void handleWsAlarmDataCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
String sessionId = sessionRef.getSessionId();
log.debug("[{}] Processing: {}", sessionId, cmd);
@ -264,7 +280,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
@Override
public void sendWsMsg(String sessionId, DataUpdate update) {
public void sendWsMsg(String sessionId, CmdUpdate update) {
sendWsMsg(sessionId, update.getCmdId(), update);
}
@ -679,6 +695,20 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
return true;
}
private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) {
if (cmd.getCmdId() < 0) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,
"Cmd id is negative value!");
sendWsMsg(sessionRef, update);
return false;
} else if (cmd.getQuery() == null) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Query is empty!");
sendWsMsg(sessionRef, update);
return false;
}
return true;
}
private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) {
if (cmd.getCmdId() < 0) {
TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST,

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.telemetry;
import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@ -29,6 +30,6 @@ public interface TelemetryWebSocketService {
void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate update);
void sendWsMsg(String sessionId, DataUpdate update);
void sendWsMsg(String sessionId, CmdUpdate update);
}

View File

@ -21,6 +21,8 @@ import org.thingsboard.server.service.telemetry.cmd.v1.GetHistoryCmd;
import org.thingsboard.server.service.telemetry.cmd.v1.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUnsubscribeCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUnsubscribeCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd;
@ -46,4 +48,8 @@ public class TelemetryPluginCmdsWrapper {
private List<AlarmDataUnsubscribeCmd> alarmDataUnsubscribeCmds;
private List<EntityCountCmd> entityCountCmds;
private List<EntityCountUnsubscribeCmd> entityCountUnsubscribeCmds;
}

View File

@ -18,14 +18,14 @@ package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import java.util.List;
@ToString
public class AlarmDataUpdate extends DataUpdate<AlarmData> {
@Getter
@ -44,8 +44,8 @@ public class AlarmDataUpdate extends DataUpdate<AlarmData> {
}
@Override
public DataUpdateType getDataUpdateType() {
return DataUpdateType.ALARM_DATA;
public CmdUpdateType getCmdUpdateType() {
return CmdUpdateType.ALARM_DATA;
}
@JsonCreator

View File

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class CmdUpdate {
private final int cmdId;
private final int errorCode;
private final String errorMsg;
public abstract CmdUpdateType getCmdUpdateType();
}

View File

@ -15,7 +15,8 @@
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
public enum DataUpdateType {
public enum CmdUpdateType {
ENTITY_DATA,
ALARM_DATA
ALARM_DATA,
COUNT_DATA
}

View File

@ -15,24 +15,24 @@
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import java.util.List;
@Data
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class DataUpdate<T> {
public abstract class DataUpdate<T> extends CmdUpdate {
private final int cmdId;
@Getter
private final PageData<T> data;
@Getter
private final List<T> update;
private final int errorCode;
private final String errorMsg;
public DataUpdate(int cmdId, PageData<T> data, List<T> update, int errorCode, String errorMsg) {
super(cmdId, errorCode, errorMsg);
this.data = data;
this.update = update;
}
public DataUpdate(int cmdId, PageData<T> data, List<T> update) {
this(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null);
@ -42,5 +42,4 @@ public abstract class DataUpdate<T> {
this(cmdId, null, null, errorCode, errorMsg);
}
public abstract DataUpdateType getDataUpdateType();
}

View File

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
public class EntityCountCmd extends DataCmd {
@Getter
private final EntityCountQuery query;
@JsonCreator
public EntityCountCmd(@JsonProperty("cmdId") int cmdId,
@JsonProperty("query") EntityCountQuery query) {
super(cmdId);
this.query = query;
}
}

View File

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import lombok.Data;
@Data
public class EntityCountUnsubscribeCmd implements UnsubscribeCmd {
private final int cmdId;
}

View File

@ -0,0 +1,57 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import java.util.List;
@ToString
public class EntityCountUpdate extends CmdUpdate {
@Getter
private int count;
public EntityCountUpdate(int cmdId, int count) {
super(cmdId, SubscriptionErrorCode.NO_ERROR.getCode(), null);
this.count = count;
}
public EntityCountUpdate(int cmdId, int errorCode, String errorMsg) {
super(cmdId, errorCode, errorMsg);
}
@Override
public CmdUpdateType getCmdUpdateType() {
return CmdUpdateType.COUNT_DATA;
}
@JsonCreator
public EntityCountUpdate(@JsonProperty("cmdId") int cmdId,
@JsonProperty("count") int count,
@JsonProperty("errorCode") int errorCode,
@JsonProperty("errorMsg") String errorMsg) {
super(cmdId, errorCode, errorMsg);
this.count = count;
}
}

View File

@ -16,16 +16,16 @@
package org.thingsboard.server.service.telemetry.cmd.v2;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import java.util.List;
@ToString
public class EntityDataUpdate extends DataUpdate<EntityData> {
@Getter
@ -41,8 +41,8 @@ public class EntityDataUpdate extends DataUpdate<EntityData> {
}
@Override
public DataUpdateType getDataUpdateType() {
return DataUpdateType.ENTITY_DATA;
public CmdUpdateType getCmdUpdateType() {
return CmdUpdateType.ENTITY_DATA;
}
@JsonCreator

View File

@ -15,8 +15,6 @@
*/
package org.thingsboard.server.service.telemetry.cmd.v2;
import lombok.Data;
public interface UnsubscribeCmd {
int getCmdId();

View File

@ -30,7 +30,7 @@
<!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.queue.memory.InMemoryStorage" level="DEBUG" />-->
<!-- <logger name="org.thingsboard.server.service.ttl.AbstractCleanUpService" level="DEBUG" />-->
<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
<!-- <logger name="org.thingsboard.server.service.telemetry" level="TRACE"/>-->

View File

@ -322,6 +322,9 @@ actors:
cache:
# caffeine or redis
type: "${CACHE_TYPE:caffeine}"
attributes:
# make sure that if cache.type is 'redis' and cache.attributes.enabled is 'true' that you change 'maxmemory-policy' Redis config property to 'allkeys-lru', 'allkeys-lfu' or 'allkeys-random'
enabled: "${CACHE_ATTRIBUTES_ENABLED:true}"
caffeine:
specs:
@ -355,6 +358,9 @@ caffeine:
deviceProfiles:
timeToLiveInMinutes: 1440
maxSize: 0
attributes:
timeToLiveInMinutes: 1440
maxSize: 100000
redis:
# standalone or cluster
@ -539,6 +545,7 @@ transport:
http:
enabled: "${HTTP_ENABLED:true}"
request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"
max_request_timeout: "${HTTP_MAX_REQUEST_TIMEOUT:300000}"
# Local MQTT transport parameters
mqtt:
# Enable/disable mqtt transport protocol.

View File

@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityListFilter;
import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
@ -132,6 +133,14 @@ public abstract class BaseEntityQueryControllerTest extends AbstractControllerTe
count = doPostWithResponse("/api/entitiesQuery/count", countQuery, Long.class);
Assert.assertEquals(97, count.longValue());
EntityTypeFilter filter2 = new EntityTypeFilter();
filter2.setEntityType(EntityType.DEVICE);
EntityCountQuery countQuery2 = new EntityCountQuery(filter2);
Long count2 = doPostWithResponse("/api/entitiesQuery/count", countQuery2, Long.class);
Assert.assertEquals(97, count2.longValue());
}
@Test
@ -198,11 +207,31 @@ public abstract class BaseEntityQueryControllerTest extends AbstractControllerTe
Assert.assertEquals(11, data.getTotalElements());
Assert.assertEquals("Device19", data.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue());
EntityTypeFilter filter2 = new EntityTypeFilter();
filter2.setEntityType(EntityType.DEVICE);
EntityDataSortOrder sortOrder2 = new EntityDataSortOrder(
new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime"), EntityDataSortOrder.Direction.ASC
);
EntityDataPageLink pageLink2 = new EntityDataPageLink(10, 0, null, sortOrder2);
List<EntityKey> entityFields2 = Collections.singletonList(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"));
EntityDataQuery query2 = new EntityDataQuery(filter2, pageLink2, entityFields2, null, null);
PageData<EntityData> data2 =
doPostWithTypedResponse("/api/entitiesQuery/find", query2, new TypeReference<PageData<EntityData>>() {
});
Assert.assertEquals(97, data2.getTotalElements());
Assert.assertEquals(10, data2.getTotalPages());
Assert.assertTrue(data2.hasNext());
Assert.assertEquals(10, data2.getData().size());
}
@Test
public void testFindEntityDataByQueryWithAttributes() throws Exception {
List<Device> devices = new ArrayList<>();
List<Long> temperatures = new ArrayList<>();
List<Long> highTemperatures = new ArrayList<>();

View File

@ -35,16 +35,23 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityKeyValueType;
import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd;
@ -243,6 +250,98 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest {
Assert.assertEquals(new TsValue(dataPoint4.getTs(), dataPoint4.getValueAsString()), tsValues[0]);
}
@Test
public void testEntityCountWsCmd() throws Exception {
Device device = new Device();
device.setName("Device");
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
device = doPost("/api/device", device, Device.class);
AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(System.currentTimeMillis(), new LongDataEntry("temperature", 42L));
sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Collections.singletonList(dataPoint1));
DeviceTypeFilter dtf1 = new DeviceTypeFilter();
dtf1.setDeviceNameFilter("D");
dtf1.setDeviceType("default");
EntityCountQuery edq1 = new EntityCountQuery(dtf1, Collections.emptyList());
EntityCountCmd cmd1 = new EntityCountCmd(1, edq1);
TelemetryPluginCmdsWrapper wrapper1 = new TelemetryPluginCmdsWrapper();
wrapper1.setEntityCountCmds(Collections.singletonList(cmd1));
wsClient.send(mapper.writeValueAsString(wrapper1));
String msg1 = wsClient.waitForReply();
EntityCountUpdate update1 = mapper.readValue(msg1, EntityCountUpdate.class);
Assert.assertEquals(1, update1.getCmdId());
Assert.assertEquals(1, update1.getCount());
DeviceTypeFilter dtf2 = new DeviceTypeFilter();
dtf2.setDeviceNameFilter("D");
dtf2.setDeviceType("non-existing-device-type");
EntityCountQuery edq2 = new EntityCountQuery(dtf2, Collections.emptyList());
EntityCountCmd cmd2 = new EntityCountCmd(2, edq2);
TelemetryPluginCmdsWrapper wrapper2 = new TelemetryPluginCmdsWrapper();
wrapper2.setEntityCountCmds(Collections.singletonList(cmd2));
wsClient.send(mapper.writeValueAsString(wrapper2));
String msg2 = wsClient.waitForReply();
EntityCountUpdate update2 = mapper.readValue(msg2, EntityCountUpdate.class);
Assert.assertEquals(2, update2.getCmdId());
Assert.assertEquals(0, update2.getCount());
KeyFilter highTemperatureFilter = new KeyFilter();
highTemperatureFilter.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "temperature"));
NumericFilterPredicate predicate = new NumericFilterPredicate();
predicate.setValue(FilterPredicateValue.fromDouble(40));
predicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
highTemperatureFilter.setPredicate(predicate);
highTemperatureFilter.setValueType(EntityKeyValueType.NUMERIC);
DeviceTypeFilter dtf3 = new DeviceTypeFilter();
dtf3.setDeviceNameFilter("D");
dtf3.setDeviceType("default");
EntityCountQuery edq3 = new EntityCountQuery(dtf3, Collections.singletonList(highTemperatureFilter));
EntityCountCmd cmd3 = new EntityCountCmd(3, edq3);
TelemetryPluginCmdsWrapper wrapper3 = new TelemetryPluginCmdsWrapper();
wrapper3.setEntityCountCmds(Collections.singletonList(cmd3));
wsClient.send(mapper.writeValueAsString(wrapper3));
String msg3 = wsClient.waitForReply();
EntityCountUpdate update3 = mapper.readValue(msg3, EntityCountUpdate.class);
Assert.assertEquals(3, update3.getCmdId());
Assert.assertEquals(1, update3.getCount());
KeyFilter highTemperatureFilter2 = new KeyFilter();
highTemperatureFilter2.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "temperature"));
NumericFilterPredicate predicate2 = new NumericFilterPredicate();
predicate2.setValue(FilterPredicateValue.fromDouble(50));
predicate2.setOperation(NumericFilterPredicate.NumericOperation.GREATER);
highTemperatureFilter2.setPredicate(predicate2);
highTemperatureFilter2.setValueType(EntityKeyValueType.NUMERIC);
DeviceTypeFilter dtf4 = new DeviceTypeFilter();
dtf4.setDeviceNameFilter("D");
dtf4.setDeviceType("default");
EntityCountQuery edq4 = new EntityCountQuery(dtf4, Collections.singletonList(highTemperatureFilter2));
EntityCountCmd cmd4 = new EntityCountCmd(4, edq4);
TelemetryPluginCmdsWrapper wrapper4 = new TelemetryPluginCmdsWrapper();
wrapper4.setEntityCountCmds(Collections.singletonList(cmd4));
wsClient.send(mapper.writeValueAsString(wrapper4));
String msg4 = wsClient.waitForReply();
EntityCountUpdate update4 = mapper.readValue(msg4, EntityCountUpdate.class);
Assert.assertEquals(4, update4.getCmdId());
Assert.assertEquals(0, update4.getCount());
}
@Test
public void testEntityDataLatestWidgetFlow() throws Exception {
Device device = new Device();

View File

@ -27,7 +27,7 @@ import java.util.Arrays;
@RunWith(ClasspathSuite.class)
@ClasspathSuite.ClassnameFilters({
// "org.thingsboard.server.controller.sql.WebsocketApiSqlTest",
// "org.thingsboard.server.controller.sql.TenantProfileControllerSqlTest",
// "org.thingsboard.server.controller.sql.EntityQueryControllerSqlTest",
"org.thingsboard.server.controller.sql.*Test",
})
public class ControllerSqlTestSuite {

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -26,4 +26,5 @@ public class CacheConstants {
public static final String SECURITY_SETTINGS_CACHE = "securitySettings";
public static final String TENANT_PROFILE_CACHE = "tenantProfiles";
public static final String DEVICE_PROFILE_CACHE = "deviceProfiles";
public static final String ATTRIBUTES_CACHE = "attributes";
}

View File

@ -19,7 +19,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import java.util.Collections;
@ -39,7 +39,7 @@ public class AssetSearchQuery {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(parameters);
query.setFilters(
Collections.singletonList(new EntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(new RelationEntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(EntityType.ASSET))));
return query;
}

View File

@ -19,7 +19,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import java.util.Collections;
@ -36,7 +36,7 @@ public class DeviceSearchQuery {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(parameters);
query.setFilters(
Collections.singletonList(new EntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(new RelationEntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(EntityType.DEVICE))));
return query;
}

View File

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
@JsonIgnoreProperties(ignoreUnknown = true)
public class AlarmCondition {
private List<KeyFilter> condition;
private List<AlarmConditionFilter> condition;
private AlarmConditionSpec spec;
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
import lombok.Data;
import org.thingsboard.server.common.data.query.EntityKeyValueType;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
@Data
public class AlarmConditionFilter {
private AlarmConditionFilterKey key;
private EntityKeyValueType valueType;
private Object value;
private KeyFilterPredicate predicate;
}

View File

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
import lombok.Data;
@Data
public class AlarmConditionFilterKey {
private final AlarmConditionKeyType type;
private final String key;
}

View File

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.device.profile;
public enum AlarmConditionKeyType {
ATTRIBUTE,
TIME_SERIES,
ENTITY_FIELD,
CONSTANT
}

View File

@ -19,7 +19,7 @@ import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import java.util.Collections;
@ -36,7 +36,7 @@ public class EntityViewSearchQuery {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(parameters);
query.setFilters(
Collections.singletonList(new EntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(new RelationEntityTypeFilter(relationType == null ? EntityRelation.CONTAINS_TYPE : relationType,
Collections.singletonList(EntityType.ENTITY_VIEW))));
return query;
}

View File

@ -29,15 +29,13 @@ public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends En
protected List<EntityKey> entityFields;
@Getter
protected List<EntityKey> latestValues;
@Getter
protected List<KeyFilter> keyFilters;
public AbstractDataQuery() {
super();
}
public AbstractDataQuery(EntityFilter entityFilter) {
super(entityFilter);
public AbstractDataQuery(EntityFilter entityFilter, List<KeyFilter> keyFilters) {
super(entityFilter, keyFilters);
}
public AbstractDataQuery(EntityFilter entityFilter,
@ -45,11 +43,10 @@ public abstract class AbstractDataQuery<T extends EntityDataPageLink> extends En
List<EntityKey> entityFields,
List<EntityKey> latestValues,
List<KeyFilter> keyFilters) {
super(entityFilter);
super(entityFilter, keyFilters);
this.pageLink = pageLink;
this.entityFields = entityFields;
this.latestValues = latestValues;
this.keyFilters = keyFilters;
}
}

View File

@ -30,8 +30,8 @@ public class AlarmDataQuery extends AbstractDataQuery<AlarmDataPageLink> {
public AlarmDataQuery() {
}
public AlarmDataQuery(EntityFilter entityFilter) {
super(entityFilter);
public AlarmDataQuery(EntityFilter entityFilter, List<KeyFilter> keyFilters) {
super(entityFilter, keyFilters);
}
public AlarmDataQuery(EntityFilter entityFilter, AlarmDataPageLink pageLink, List<EntityKey> entityFields, List<EntityKey> latestValues, List<KeyFilter> keyFilters, List<EntityKey> alarmFields) {

View File

@ -17,14 +17,26 @@ package org.thingsboard.server.common.data.query;
import lombok.Getter;
import java.util.Collections;
import java.util.List;
public class EntityCountQuery {
@Getter
private EntityFilter entityFilter;
public EntityCountQuery() {}
@Getter
protected List<KeyFilter> keyFilters;
public EntityCountQuery() {
}
public EntityCountQuery(EntityFilter entityFilter) {
this(entityFilter, Collections.emptyList());
}
public EntityCountQuery(EntityFilter entityFilter, List<KeyFilter> keyFilters) {
this.entityFilter = entityFilter;
this.keyFilters = keyFilters;
}
}

View File

@ -27,8 +27,8 @@ public class EntityDataQuery extends AbstractDataQuery<EntityDataPageLink> {
public EntityDataQuery() {
}
public EntityDataQuery(EntityFilter entityFilter) {
super(entityFilter);
public EntityDataQuery(EntityFilter entityFilter, List<KeyFilter> keyFilters) {
super(entityFilter, keyFilters);
}
public EntityDataQuery(EntityFilter entityFilter, EntityDataPageLink pageLink, List<EntityKey> entityFields, List<EntityKey> latestValues, List<KeyFilter> keyFilters) {

View File

@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(value = SingleEntityFilter.class, name = "singleEntity"),
@JsonSubTypes.Type(value = EntityListFilter.class, name = "entityList"),
@JsonSubTypes.Type(value = EntityNameFilter.class, name = "entityName"),
@JsonSubTypes.Type(value = EntityTypeFilter.class, name = "entityType"),
@JsonSubTypes.Type(value = AssetTypeFilter.class, name = "assetType"),
@JsonSubTypes.Type(value = DeviceTypeFilter.class, name = "deviceType"),
@JsonSubTypes.Type(value = EntityViewTypeFilter.class, name = "entityViewType"),

View File

@ -19,6 +19,7 @@ public enum EntityFilterType {
SINGLE_ENTITY("singleEntity"),
ENTITY_LIST("entityList"),
ENTITY_NAME("entityName"),
ENTITY_TYPE("entityType"),
ASSET_TYPE("assetType"),
DEVICE_TYPE("deviceType"),
ENTITY_VIEW_TYPE("entityViewType"),

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.query;
import lombok.Data;
import org.thingsboard.server.common.data.EntityType;
@Data
public class EntityTypeFilter implements EntityFilter {
@Override
public EntityFilterType getType() {
return EntityFilterType.ENTITY_TYPE;
}
private EntityType entityType;
}

View File

@ -18,8 +18,7 @@ package org.thingsboard.server.common.data.query;
import lombok.Data;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import java.util.List;
@ -33,7 +32,7 @@ public class RelationsQueryFilter implements EntityFilter {
private EntityId rootEntity;
private EntitySearchDirection direction;
private List<EntityTypeFilter> filters;
private List<RelationEntityTypeFilter> filters;
private int maxLevel;
private boolean fetchLastLevelOnly;

View File

@ -26,6 +26,6 @@ import java.util.List;
public class EntityRelationsQuery {
private RelationsSearchParameters parameters;
private List<EntityTypeFilter> filters;
private List<RelationEntityTypeFilter> filters;
}

View File

@ -26,7 +26,7 @@ import java.util.List;
*/
@Data
@AllArgsConstructor
public class EntityTypeFilter {
public class RelationEntityTypeFilter {
private String relationType;

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -120,7 +120,7 @@ public final class TbMsg implements Serializable {
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, int ruleNodeExecCounter, TbMsgCallback callback) {
this.id = id;
this.queueName = queueName;
this.queueName = queueName != null ? queueName : ServiceQueue.MAIN;
if (ts > 0) {
this.ts = ts;
} else {

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>common</artifactId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.common</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.common.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.common</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.common.transport</groupId>

View File

@ -17,9 +17,13 @@ package org.thingsboard.server.transport.http;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.http11.Http11NioProtocol;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.web.embedded.tomcat.TomcatConnectorCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportContext;
@ -37,4 +41,18 @@ public class HttpTransportContext extends TransportContext {
@Value("${transport.http.request_timeout}")
private long defaultTimeout;
@Getter
@Value("${transport.http.max_request_timeout}")
private long maxRequestTimeout;
@Bean
public TomcatConnectorCustomizer tomcatAsyncTimeoutConnectorCustomizer() {
return connector -> {
ProtocolHandler handler = connector.getProtocolHandler();
if (handler instanceof Http11NioProtocol) {
log.trace("Setting async max request timeout {}", maxRequestTimeout);
connector.setAsyncTimeout(maxRequestTimeout);
}
};
}
}

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.common</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.common.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.common</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.common.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>common</artifactId>
</parent>
<groupId>org.thingsboard.common</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>dao</artifactId>

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.thingsboard.server.common.data.id.EntityId;
import java.io.Serializable;
@EqualsAndHashCode
@Getter
@AllArgsConstructor
public class AttributeCacheKey implements Serializable {
private static final long serialVersionUID = 2013369077925351881L;
private final String scope;
private final EntityId entityId;
private final String key;
@Override
public String toString() {
return entityId + "_" + scope + "_" + key;
}
}

View File

@ -0,0 +1,39 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.service.Validator;
public class AttributeUtils {
public static void validate(EntityId id, String scope) {
Validator.validateId(id.getId(), "Incorrect id " + id);
Validator.validateString(scope, "Incorrect scope " + scope);
}
public static void validate(AttributeKvEntry kvEntry) {
if (kvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
} else if (kvEntry.getDataType() == null) {
throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
} else {
Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive");
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true")
@Primary
@Slf4j
public class AttributesCacheWrapper {
private final Cache attributesCache;
public AttributesCacheWrapper(CacheManager cacheManager) {
this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE);
}
public Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey) {
try {
return attributesCache.get(attributeCacheKey);
} catch (Exception e) {
log.debug("Failed to retrieve element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
return null;
}
}
public void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) {
try {
attributesCache.put(attributeCacheKey, attributeKvEntry);
} catch (Exception e) {
log.debug("Failed to put element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
}
}
public void evict(AttributeCacheKey attributeCacheKey) {
try {
attributesCache.evict(attributeCacheKey);
} catch (Exception e) {
log.debug("Failed to evict element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
}
}
}

View File

@ -15,31 +15,39 @@
*/
package org.thingsboard.server.dao.attributes;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.service.Validator;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
/**
* @author Andrew Shvayka
*/
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "false", matchIfMissing = true)
@Primary
@Slf4j
public class BaseAttributesService implements AttributesService {
private final AttributesDao attributesDao;
@Autowired
private AttributesDao attributesDao;
public BaseAttributesService(AttributesDao attributesDao) {
this.attributesDao = attributesDao;
}
@Override
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) {
@ -75,33 +83,14 @@ public class BaseAttributesService implements AttributesService {
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
attributes.forEach(attribute -> validate(attribute));
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(attributes.size());
for (AttributeKvEntry attribute : attributes) {
futures.add(attributesDao.save(tenantId, entityId, scope, attribute));
}
return Futures.allAsList(futures);
List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
return Futures.allAsList(saveFutures);
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
return attributesDao.removeAll(tenantId, entityId, scope, keys);
return attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
}
private static void validate(EntityId id, String scope) {
Validator.validateId(id.getId(), "Incorrect id " + id);
Validator.validateString(scope, "Incorrect scope " + scope);
}
private static void validate(AttributeKvEntry kvEntry) {
if (kvEntry == null) {
throw new IncorrectParameterException("Key value entry can't be null");
} else if (kvEntry.getDataType() == null) {
throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
} else {
Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
Validator.validatePositiveNumber(kvEntry.getLastUpdateTs(), "Incorrect last update ts. Ts should be positive");
}
}
}

View File

@ -0,0 +1,193 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true")
@Primary
@Slf4j
public class CachedAttributesService implements AttributesService {
private static final String STATS_NAME = "attributes.cache";
private final AttributesDao attributesDao;
private final AttributesCacheWrapper cacheWrapper;
private final DefaultCounter hitCounter;
private final DefaultCounter missCounter;
public CachedAttributesService(AttributesDao attributesDao,
AttributesCacheWrapper cacheWrapper,
StatsFactory statsFactory) {
this.attributesDao = attributesDao;
this.cacheWrapper = cacheWrapper;
this.hitCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "hit");
this.missCounter = statsFactory.createDefaultCounter(STATS_NAME, "result", "miss");
}
@Override
public ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, String attributeKey) {
validate(entityId, scope);
Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, attributeKey);
Cache.ValueWrapper cachedAttributeValue = cacheWrapper.get(attributeCacheKey);
if (cachedAttributeValue != null) {
hitCounter.increment();
AttributeKvEntry cachedAttributeKvEntry = (AttributeKvEntry) cachedAttributeValue.get();
return Futures.immediateFuture(Optional.ofNullable(cachedAttributeKvEntry));
} else {
missCounter.increment();
ListenableFuture<Optional<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
return Futures.transform(result, foundAttrKvEntry -> {
// TODO: think if it's a good idea to store 'empty' attributes
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
return foundAttrKvEntry;
}, MoreExecutors.directExecutor());
}
}
@Override
public ListenableFuture<List<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> attributeKeys) {
validate(entityId, scope);
attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
Map<String, Cache.ValueWrapper> wrappedCachedAttributes = findCachedAttributes(entityId, scope, attributeKeys);
List<AttributeKvEntry> cachedAttributes = wrappedCachedAttributes.values().stream()
.map(wrappedCachedAttribute -> (AttributeKvEntry) wrappedCachedAttribute.get())
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (wrappedCachedAttributes.size() == attributeKeys.size()) {
return Futures.immediateFuture(cachedAttributes);
}
Set<String> notFoundAttributeKeys = new HashSet<>(attributeKeys);
notFoundAttributeKeys.removeAll(wrappedCachedAttributes.keySet());
ListenableFuture<List<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
return Futures.transform(result, foundInDbAttributes -> mergeDbAndCacheAttributes(entityId, scope, cachedAttributes, notFoundAttributeKeys, foundInDbAttributes), MoreExecutors.directExecutor());
}
private Map<String, Cache.ValueWrapper> findCachedAttributes(EntityId entityId, String scope, Collection<String> attributeKeys) {
Map<String, Cache.ValueWrapper> cachedAttributes = new HashMap<>();
for (String attributeKey : attributeKeys) {
Cache.ValueWrapper cachedAttributeValue = cacheWrapper.get(new AttributeCacheKey(scope, entityId, attributeKey));
if (cachedAttributeValue != null) {
hitCounter.increment();
cachedAttributes.put(attributeKey, cachedAttributeValue);
} else {
missCounter.increment();
}
}
return cachedAttributes;
}
private List<AttributeKvEntry> mergeDbAndCacheAttributes(EntityId entityId, String scope, List<AttributeKvEntry> cachedAttributes, Set<String> notFoundAttributeKeys, List<AttributeKvEntry> foundInDbAttributes) {
for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
cacheWrapper.put(attributeCacheKey, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
}
for (String key : notFoundAttributeKeys){
cacheWrapper.put(new AttributeCacheKey(scope, entityId, key), null);
}
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(foundInDbAttributes);
return mergedAttributes;
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope) {
validate(entityId, scope);
return attributesDao.findAll(tenantId, entityId, scope);
}
@Override
public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
return attributesDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId);
}
@Override
public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) {
return attributesDao.findAllKeysByEntityIds(tenantId, entityType, entityIds);
}
@Override
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
attributes.forEach(AttributeUtils::validate);
List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
ListenableFuture<List<Void>> future = Futures.allAsList(saveFutures);
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
return future;
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), MoreExecutors.directExecutor());
return future;
}
private void evictAttributesFromCache(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
try {
for (String attributeKey : attributeKeys) {
cacheWrapper.evict(new AttributeCacheKey(scope, entityId, attributeKey));
}
} catch (Exception e) {
log.error("[{}][{}] Failed to remove values from cache.", tenantId, entityId, e);
}
}
}

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationInfo;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.dao.entity.EntityService;
@ -457,7 +457,7 @@ public class BaseRelationService implements RelationService {
//boolean fetchLastLevelOnly = true;
log.trace("Executing findByQuery [{}]", query);
RelationsSearchParameters params = query.getParameters();
final List<EntityTypeFilter> filters = query.getFilters();
final List<RelationEntityTypeFilter> filters = query.getFilters();
if (filters == null || filters.isEmpty()) {
log.debug("Filters are not set [{}]", query);
}
@ -575,8 +575,8 @@ public class BaseRelationService implements RelationService {
};
}
private boolean matchFilters(List<EntityTypeFilter> filters, EntityRelation relation, EntitySearchDirection direction) {
for (EntityTypeFilter filter : filters) {
private boolean matchFilters(List<RelationEntityTypeFilter> filters, EntityRelation relation, EntitySearchDirection direction) {
for (RelationEntityTypeFilter filter : filters) {
if (match(filter, relation, direction)) {
return true;
}
@ -584,7 +584,7 @@ public class BaseRelationService implements RelationService {
return false;
}
private boolean match(EntityTypeFilter filter, EntityRelation relation, EntitySearchDirection direction) {
private boolean match(RelationEntityTypeFilter filter, EntityRelation relation, EntitySearchDirection direction) {
if (StringUtils.isEmpty(filter.getRelationType()) || filter.getRelationType().equals(relation.getType())) {
if (filter.getEntityTypes() == null || filter.getEntityTypes().isEmpty()) {
return true;

View File

@ -40,12 +40,13 @@ import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityListFilter;
import org.thingsboard.server.common.data.query.EntityNameFilter;
import org.thingsboard.server.common.data.query.EntitySearchQueryFilter;
import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.query.EntityViewSearchQueryFilter;
import org.thingsboard.server.common.data.query.EntityViewTypeFilter;
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import java.util.Arrays;
import java.util.Collections;
@ -249,18 +250,70 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) {
EntityType entityType = resolveEntityType(query.getEntityFilter());
QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType));
ctx.append("select count(e.id) from ");
ctx.append(addEntityTableQuery(ctx, query.getEntityFilter()));
ctx.append(" e where ");
ctx.append(buildEntityWhere(ctx, query.getEntityFilter(), Collections.emptyList()));
return transactionTemplate.execute(status -> {
long startTs = System.currentTimeMillis();
try {
return jdbcTemplate.queryForObject(ctx.getQuery(), ctx, Long.class);
} finally {
queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs);
if (query.getKeyFilters() == null || query.getKeyFilters().isEmpty()) {
ctx.append("select count(e.id) from ");
ctx.append(addEntityTableQuery(ctx, query.getEntityFilter()));
ctx.append(" e where ");
ctx.append(buildEntityWhere(ctx, query.getEntityFilter(), Collections.emptyList()));
return transactionTemplate.execute(status -> {
long startTs = System.currentTimeMillis();
try {
return jdbcTemplate.queryForObject(ctx.getQuery(), ctx, Long.class);
} finally {
queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs);
}
});
} else {
List<EntityKeyMapping> mappings = EntityKeyMapping.prepareEntityCountKeyMapping(query);
List<EntityKeyMapping> selectionMapping = mappings.stream().filter(EntityKeyMapping::isSelection)
.collect(Collectors.toList());
List<EntityKeyMapping> entityFieldsSelectionMapping = selectionMapping.stream().filter(mapping -> !mapping.isLatest())
.collect(Collectors.toList());
List<EntityKeyMapping> filterMapping = mappings.stream().filter(EntityKeyMapping::hasFilter)
.collect(Collectors.toList());
List<EntityKeyMapping> entityFieldsFiltersMapping = filterMapping.stream().filter(mapping -> !mapping.isLatest())
.collect(Collectors.toList());
List<EntityKeyMapping> allLatestMappings = mappings.stream().filter(EntityKeyMapping::isLatest)
.collect(Collectors.toList());
String entityWhereClause = DefaultEntityQueryRepository.this.buildEntityWhere(ctx, query.getEntityFilter(), entityFieldsFiltersMapping);
String latestJoinsCnt = EntityKeyMapping.buildLatestJoins(ctx, query.getEntityFilter(), entityType, allLatestMappings, true);
String entityFieldsSelection = EntityKeyMapping.buildSelections(entityFieldsSelectionMapping, query.getEntityFilter().getType(), entityType);
String entityTypeStr;
if (query.getEntityFilter().getType().equals(EntityFilterType.RELATIONS_QUERY)) {
entityTypeStr = "e.entity_type";
} else {
entityTypeStr = "'" + entityType.name() + "'";
}
});
if (!StringUtils.isEmpty(entityFieldsSelection)) {
entityFieldsSelection = String.format("e.id id, %s entity_type, %s", entityTypeStr, entityFieldsSelection);
} else {
entityFieldsSelection = String.format("e.id id, %s entity_type", entityTypeStr);
}
String fromClauseCount = String.format("from (select %s from (select %s from %s e where %s) entities %s ) result %s",
"entities.*",
entityFieldsSelection,
addEntityTableQuery(ctx, query.getEntityFilter()),
entityWhereClause,
latestJoinsCnt,
"");
String countQuery = String.format("select count(id) %s", fromClauseCount);
return transactionTemplate.execute(status -> {
long startTs = System.currentTimeMillis();
try {
return jdbcTemplate.queryForObject(countQuery, ctx, Long.class);
} finally {
queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs);
}
});
}
}
@Override
@ -436,6 +489,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
case ASSET_SEARCH_QUERY:
case ENTITY_VIEW_SEARCH_QUERY:
case API_USAGE_STATE:
case ENTITY_TYPE:
return "";
default:
throw new RuntimeException("Not implemented!");
@ -521,7 +575,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
boolean single = entityFilter.getFilters() != null && entityFilter.getFilters().size() == 1;
if (entityFilter.getFilters() != null && !entityFilter.getFilters().isEmpty()) {
int entityTypeFilterIdx = 0;
for (EntityTypeFilter etf : entityFilter.getFilters()) {
for (RelationEntityTypeFilter etf : entityFilter.getFilters()) {
String etfCondition = buildEtfCondition(ctx, etf, entityFilter.getDirection(), entityTypeFilterIdx++);
if (!etfCondition.isEmpty()) {
if (noConditions) {
@ -570,7 +624,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
return "( " + selectFields + from + ")";
}
private String buildEtfCondition(QueryContext ctx, EntityTypeFilter etf, EntitySearchDirection direction, int entityTypeFilterIdx) {
private String buildEtfCondition(QueryContext ctx, RelationEntityTypeFilter etf, EntitySearchDirection direction, int entityTypeFilterIdx) {
StringBuilder whereFilter = new StringBuilder();
String relationType = etf.getRelationType();
List<EntityType> entityTypes = etf.getEntityTypes();
@ -676,6 +730,8 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
return ((EntityListFilter) entityFilter).getEntityType();
case ENTITY_NAME:
return ((EntityNameFilter) entityFilter).getEntityType();
case ENTITY_TYPE:
return ((EntityTypeFilter) entityFilter).getEntityType();
case ASSET_TYPE:
case ASSET_SEARCH_QUERY:
return EntityType.ASSET;

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityFilter;
@ -380,6 +381,30 @@ public class EntityKeyMapping {
return mappings;
}
public static List<EntityKeyMapping> prepareEntityCountKeyMapping(EntityCountQuery query) {
Map<EntityKey, List<KeyFilter>> filters =
query.getKeyFilters() != null ?
query.getKeyFilters().stream().collect(Collectors.groupingBy(KeyFilter::getKey)) : Collections.emptyMap();
int index = 2;
List<EntityKeyMapping> mappings = new ArrayList<>();
if (!filters.isEmpty()) {
for (EntityKey filterField : filters.keySet()) {
EntityKeyMapping mapping = new EntityKeyMapping();
mapping.setIndex(index);
mapping.setAlias(String.format("alias%s", index));
mapping.setKeyFilters(filters.get(filterField));
mapping.setLatest(!filterField.getType().equals(EntityKeyType.ENTITY_FIELD));
mapping.setSelection(false);
mapping.setEntityKey(filterField);
mappings.add(mapping);
index += 1;
}
}
return mappings;
}
private String buildAttributeSelection() {
return buildTimeSeriesOrAttrSelection(true);
}

View File

@ -19,8 +19,10 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -114,6 +116,14 @@ public class JpaPsqlTimeseriesDao extends AbstractChunkedAggregationTimeseriesDa
partitioningRepository.save(psqlPartition);
log.trace("Adding partition to Set: {}", psqlPartition);
partitions.put(psqlPartition.getStart(), psqlPartition);
} catch (DataIntegrityViolationException ex) {
log.trace("Error occurred during partition save:", ex);
if (ex.getCause() instanceof ConstraintViolationException) {
log.warn("Saving partition [{}] rejected. Timeseries data will save to the ts_kv_indefinite (DEFAULT) partition.", psqlPartition.getPartitionDate());
partitions.put(psqlPartition.getStart(), psqlPartition);
} else {
throw new RuntimeException(ex);
}
} finally {
partitionCreationLock.unlock();
}

View File

@ -15,9 +15,6 @@
*/
package org.thingsboard.server.dao.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.After;
@ -31,25 +28,48 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.*;
import org.thingsboard.server.common.data.query.AssetSearchQueryFilter;
import org.thingsboard.server.common.data.query.DeviceSearchQueryFilter;
import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityListFilter;
import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.model.sqlts.ts.TsKvEntity;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.DaoTestUtil;
import org.thingsboard.server.dao.util.SqlDbType;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -140,13 +160,13 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
long count = entityService.countEntitiesByQuery(tenantId, new CustomerId(CustomerId.NULL_UUID), countQuery);
Assert.assertEquals(30, count);
filter.setFilters(Collections.singletonList(new EntityTypeFilter("Contains", Collections.singletonList(EntityType.DEVICE))));
filter.setFilters(Collections.singletonList(new RelationEntityTypeFilter("Contains", Collections.singletonList(EntityType.DEVICE))));
count = entityService.countEntitiesByQuery(tenantId, new CustomerId(CustomerId.NULL_UUID), countQuery);
Assert.assertEquals(25, count);
filter.setRootEntity(devices.get(0).getId());
filter.setDirection(EntitySearchDirection.TO);
filter.setFilters(Collections.singletonList(new EntityTypeFilter("Manages", Collections.singletonList(EntityType.TENANT))));
filter.setFilters(Collections.singletonList(new RelationEntityTypeFilter("Manages", Collections.singletonList(EntityType.TENANT))));
count = entityService.countEntitiesByQuery(tenantId, new CustomerId(CustomerId.NULL_UUID), countQuery);
Assert.assertEquals(1, count);
@ -208,7 +228,7 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
RelationsQueryFilter filter = new RelationsQueryFilter();
filter.setRootEntity(tenantId);
filter.setDirection(EntitySearchDirection.FROM);
filter.setFilters(Collections.singletonList(new EntityTypeFilter("Contains", Collections.singletonList(EntityType.DEVICE))));
filter.setFilters(Collections.singletonList(new RelationEntityTypeFilter("Contains", Collections.singletonList(EntityType.DEVICE))));
EntityDataSortOrder sortOrder = new EntityDataSortOrder(
new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime"), EntityDataSortOrder.Direction.ASC

View File

@ -26,7 +26,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.EntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.dao.exception.DataValidationException;
@ -221,7 +221,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, false));
query.setFilters(Collections.singletonList(new EntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
Assert.assertEquals(3, relations.size());
Assert.assertTrue(relations.contains(relationA));
@ -255,7 +255,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, false));
query.setFilters(Collections.singletonList(new EntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET))));
List<EntityRelation> relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get();
Assert.assertEquals(2, relations.size());
Assert.assertTrue(relations.contains(relationAB));

View File

@ -21,7 +21,7 @@
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -1,7 +1,7 @@
{
"name": "thingsboard-js-executor",
"private": true,
"version": "3.3.0",
"version": "3.2.2",
"description": "ThingsBoard JavaScript Executor Microservice",
"main": "server.js",
"bin": "server.js",

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>msa</artifactId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.msa</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.msa.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.msa</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.msa.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard.msa</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.msa.transport</groupId>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -1,7 +1,7 @@
{
"name": "thingsboard-web-ui",
"private": true,
"version": "3.3.0",
"version": "3.2.2",
"description": "ThingsBoard Web UI Microservice",
"main": "server.js",
"bin": "server.js",

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>

View File

@ -19,11 +19,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>netty-mqtt</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Netty MQTT Client</name>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.thingsboard</groupId>
<artifactId>thingsboard</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Thingsboard</name>

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>rest-client</artifactId>

View File

@ -2207,7 +2207,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);
return restTemplate.exchange(
baseURL + "/api/tenantProfiles" + getUrlParams(pageLink),
baseURL + "/api/tenantProfiles?" + getUrlParams(pageLink),
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<PageData<TenantProfile>>() {
@ -2218,7 +2218,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);
return restTemplate.exchange(
baseURL + "/api/tenantProfileInfos" + getUrlParams(pageLink),
baseURL + "/api/tenantProfileInfos?" + getUrlParams(pageLink),
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<PageData<EntityInfo>>() {
@ -2275,7 +2275,7 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);
return restTemplate.exchange(
baseURL + "/api/users" + getUrlParams(pageLink),
baseURL + "/api/users?" + getUrlParams(pageLink),
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<PageData<User>>() {

View File

@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>3.2.2-SNAPSHOT</version>
<artifactId>thingsboard</artifactId>
</parent>
<artifactId>rule-engine</artifactId>

Some files were not shown because too many files have changed in this diff Show More