Fix backward compatibility for defaultQueueName field of device profile
This commit is contained in:
parent
6173795580
commit
e49da1035b
@ -571,6 +571,22 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", SCHEMA_UPDATE_SQL);
|
||||
loadSql(schemaUpdateFile, conn);
|
||||
|
||||
log.info("Loading queues...");
|
||||
try {
|
||||
if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
|
||||
queueConfig.getQueues().forEach(queueSettings -> {
|
||||
Queue queue = queueConfigToQueue(queueSettings);
|
||||
Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName());
|
||||
if (existing == null) {
|
||||
queueService.saveQueue(queue);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
systemDataLoaderService.createQueues();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
log.info("Updating device profiles...");
|
||||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql");
|
||||
loadSql(schemaUpdateFile, conn);
|
||||
@ -628,4 +644,29 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
return isOldSchema;
|
||||
}
|
||||
|
||||
private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
|
||||
Queue queue = new Queue();
|
||||
queue.setTenantId(TenantId.SYS_TENANT_ID);
|
||||
queue.setName(queueSettings.getName());
|
||||
queue.setTopic(queueSettings.getTopic());
|
||||
queue.setPollInterval(queueSettings.getPollInterval());
|
||||
queue.setPartitions(queueSettings.getPartitions());
|
||||
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
|
||||
SubmitStrategy submitStrategy = new SubmitStrategy();
|
||||
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
|
||||
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
|
||||
queue.setSubmitStrategy(submitStrategy);
|
||||
ProcessingStrategy processingStrategy = new ProcessingStrategy();
|
||||
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
|
||||
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
|
||||
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
|
||||
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
|
||||
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
|
||||
queue.setProcessingStrategy(processingStrategy);
|
||||
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -162,21 +162,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
break;
|
||||
case "3.3.4":
|
||||
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
|
||||
log.info("Loading queues...");
|
||||
try {
|
||||
if (!CollectionUtils.isEmpty(queueConfig.getQueues())) {
|
||||
queueConfig.getQueues().forEach(queueSettings -> {
|
||||
Queue queue = queueConfigToQueue(queueSettings);
|
||||
Queue existing = queueService.findQueueByTenantIdAndName(queue.getTenantId(), queue.getName());
|
||||
if (existing == null) {
|
||||
queueService.saveQueue(queue);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
systemDataLoaderService.createQueues();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
tenantsProfileQueueConfigurationUpdater.updateEntities(null);
|
||||
checkPointRuleNodesUpdater.updateEntities(null);
|
||||
break;
|
||||
@ -649,29 +634,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
return mainQueueConfiguration;
|
||||
}
|
||||
|
||||
private Queue queueConfigToQueue(TbRuleEngineQueueConfiguration queueSettings) {
|
||||
Queue queue = new Queue();
|
||||
queue.setTenantId(TenantId.SYS_TENANT_ID);
|
||||
queue.setName(queueSettings.getName());
|
||||
queue.setTopic(queueSettings.getTopic());
|
||||
queue.setPollInterval(queueSettings.getPollInterval());
|
||||
queue.setPartitions(queueSettings.getPartitions());
|
||||
queue.setPackProcessingTimeout(queueSettings.getPackProcessingTimeout());
|
||||
SubmitStrategy submitStrategy = new SubmitStrategy();
|
||||
submitStrategy.setBatchSize(queueSettings.getSubmitStrategy().getBatchSize());
|
||||
submitStrategy.setType(SubmitStrategyType.valueOf(queueSettings.getSubmitStrategy().getType()));
|
||||
queue.setSubmitStrategy(submitStrategy);
|
||||
ProcessingStrategy processingStrategy = new ProcessingStrategy();
|
||||
processingStrategy.setType(ProcessingStrategyType.valueOf(queueSettings.getProcessingStrategy().getType()));
|
||||
processingStrategy.setRetries(queueSettings.getProcessingStrategy().getRetries());
|
||||
processingStrategy.setFailurePercentage(queueSettings.getProcessingStrategy().getFailurePercentage());
|
||||
processingStrategy.setPauseBetweenRetries(queueSettings.getProcessingStrategy().getPauseBetweenRetries());
|
||||
processingStrategy.setMaxPauseBetweenRetries(queueSettings.getProcessingStrategy().getMaxPauseBetweenRetries());
|
||||
queue.setProcessingStrategy(processingStrategy);
|
||||
queue.setConsumerPerPartition(queueSettings.isConsumerPerPartition());
|
||||
return queue;
|
||||
}
|
||||
|
||||
private final PaginatedUpdater<String, RuleNode> checkPointRuleNodesUpdater =
|
||||
new PaginatedUpdater<>() {
|
||||
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
@ -78,6 +79,8 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
|
||||
"If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " +
|
||||
"Otherwise, the 'Main' queue will be used to store those messages.")
|
||||
private QueueId defaultQueueId;
|
||||
|
||||
private String defaultQueueName;
|
||||
@Valid
|
||||
private transient DeviceProfileData profileData;
|
||||
@JsonIgnore
|
||||
@ -168,4 +171,13 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public String getDefaultQueueName() {
|
||||
return defaultQueueName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public void setDefaultQueueName(String defaultQueueName) {
|
||||
this.defaultQueueName = defaultQueueName;
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,8 +37,10 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.service.PaginatedRemover;
|
||||
import org.thingsboard.server.dao.service.Validator;
|
||||
@ -71,6 +73,9 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
|
||||
@Autowired
|
||||
private DataValidator<DeviceProfile> deviceProfileValidator;
|
||||
|
||||
@Autowired
|
||||
private QueueService queueService;
|
||||
|
||||
private final Lock findOrCreateLock = new ReentrantLock();
|
||||
|
||||
@TransactionalEventListener(classes = DeviceProfileEvictEvent.class)
|
||||
@ -119,6 +124,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
|
||||
DeviceProfile oldDeviceProfile = deviceProfileValidator.validate(deviceProfile, DeviceProfile::getTenantId);
|
||||
DeviceProfile savedDeviceProfile;
|
||||
try {
|
||||
if (deviceProfile.getDefaultQueueId() == null && StringUtils.isNotEmpty(deviceProfile.getDefaultQueueName())) {
|
||||
Queue existing = queueService.findQueueByTenantIdAndName(deviceProfile.getTenantId(), deviceProfile.getDefaultQueueName());
|
||||
if (existing != null) {
|
||||
deviceProfile.setDefaultQueueId(existing.getId());
|
||||
}
|
||||
}
|
||||
savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
|
||||
publishEvictEvent(new DeviceProfileEvictEvent(savedDeviceProfile.getTenantId(), savedDeviceProfile.getName(),
|
||||
oldDeviceProfile != null ? oldDeviceProfile.getName() : null, savedDeviceProfile.getId(), savedDeviceProfile.isDefault()));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user