Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Sergey Matvienko 2021-12-21 12:01:04 +02:00
commit 9a9b69f79c
34 changed files with 420 additions and 150 deletions

View File

@ -48,15 +48,17 @@ import org.thingsboard.server.gen.edge.v1.UserUpdateMsg;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
public static final String EDGE_HOST = "localhost";
public static final int EDGE_PORT = 7070;
private IdComparator<Edge> idComparator = new IdComparator<>();
private Tenant savedTenant;
@ -68,7 +70,7 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
loginSysAdmin();
Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
tenant.setTitle("My tenant for Edge");
savedTenant = doPost("/api/tenant", tenant, Tenant.class);
tenantId = savedTenant.getId();
Assert.assertNotNull(savedTenant);
@ -667,45 +669,45 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
@Test
public void testSyncEdge() throws Exception {
Edge edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
Edge edge = doPost("/api/edge", constructEdge("Test Sync Edge", "test"), Edge.class);
Device device = new Device();
device.setName("Edge Device 1");
device.setName("Test Sync Edge Device 1");
device.setType("default");
Device savedDevice = doPost("/api/device", device, Device.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDevice.getId().getId().toString(), Device.class);
Asset asset = new Asset();
asset.setName("Edge Asset 1");
asset.setName("Test Sync Edge Asset 1");
asset.setType("test");
Asset savedAsset = doPost("/api/asset", asset, Asset.class);
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/asset/" + savedAsset.getId().getId().toString(), Asset.class);
EdgeImitator edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
EdgeImitator edgeImitator = new EdgeImitator(EDGE_HOST, EDGE_PORT, edge.getRoutingKey(), edge.getSecret());
edgeImitator.ignoreType(UserCredentialsUpdateMsg.class);
edgeImitator.expectMessageAmount(11);
edgeImitator.connect();
Assert.assertTrue(edgeImitator.waitForMessages());
assertThat(edgeImitator.waitForMessages()).as("await for messages on first connect").isTrue();
Assert.assertEquals(2, edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class).size()); // one msg during sync process, another from edge creation
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class).size()); // one msg during sync process for 'default' device profile
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class).size()); // one msg once device assigned to edge
Assert.assertEquals(2, edgeImitator.findAllMessagesByType(AssetUpdateMsg.class).size()); // two msgs - one during sync process, and one more once asset assigned to edge
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(UserUpdateMsg.class).size()); // one msg during sync process for tenant admin user
Assert.assertEquals(4, edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class).size());
assertThat(edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class)).as("one msg during sync process, another from edge creation").hasSize(2);
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("one msg during sync process for 'default' device profile").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class)).as("one msg once device assigned to edge").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("two msgs - one during sync process, and one more once asset assigned to edge").hasSize(2);
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("one msg during sync process for tenant admin user").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update").hasSize(4);
edgeImitator.expectMessageAmount(8);
doPost("/api/edge/sync/" + edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages());
assertThat(edgeImitator.waitForMessages()).as("await for messages after edge sync rest api call").isTrue();
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class).size());
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class).size());
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(AssetUpdateMsg.class).size());
Assert.assertEquals(1, edgeImitator.findAllMessagesByType(UserUpdateMsg.class).size());
Assert.assertEquals(4, edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class).size());
assertThat(edgeImitator.findAllMessagesByType(RuleChainUpdateMsg.class)).as("rule chain msg").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(DeviceProfileUpdateMsg.class)).as("device profile msg").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(AssetUpdateMsg.class)).as("asset update msg").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(UserUpdateMsg.class)).as("user update msg").hasSize(1);
assertThat(edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class)).as("admin setting update msg").hasSize(4);
edgeImitator.allowIgnoredTypes();
try {
@ -720,4 +722,4 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
.andExpect(status().isOk());
}
}
}

View File

@ -767,7 +767,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
edgeImitator.expectMessageAmount(1);
doDelete("/api/alarm/" + savedAlarm.getId().getId().toString())
.andExpect(status().isOk());
Assert.assertTrue(edgeImitator.waitForMessages(1));
Assert.assertTrue(edgeImitator.waitForMessages());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg);
alarmUpdateMsg = (AlarmUpdateMsg) latestMessage;

View File

@ -65,6 +65,8 @@ import java.util.stream.Collectors;
@Slf4j
public class EdgeImitator {
public static final int TIMEOUT_IN_SECONDS = 30;
private String routingKey;
private String routingSecret;
@ -293,7 +295,7 @@ public class EdgeImitator {
}
public boolean waitForMessages() throws InterruptedException {
return waitForMessages(5);
return waitForMessages(TIMEOUT_IN_SECONDS);
}
public boolean waitForMessages(int timeoutInSeconds) throws InterruptedException {
@ -308,7 +310,7 @@ public class EdgeImitator {
}
public boolean waitForResponses() throws InterruptedException {
return responsesLatch.await(5, TimeUnit.SECONDS);
return responsesLatch.await(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
}
public void expectResponsesAmount(int messageAmount) {

View File

@ -102,7 +102,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -112,7 +112,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
@ -182,13 +182,13 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -43,6 +43,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate;
@ -63,6 +64,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,7 +79,8 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.sqsSettings = sqsSettings;
this.coreSettings = coreSettings;
this.transportApiSettings = transportApiSettings;
@ -85,6 +88,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
@ -95,7 +99,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getResponsesTopic());
}
@Override
@ -172,13 +176,13 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
return new TbAwsSqsConsumerTemplate<>(coreAdmin, sqsSettings, coreSettings.getOtaPackageTopic(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), ToOtaPackageStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
}

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate;
@ -57,6 +58,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbAwsSqsSettings sqsSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -68,13 +70,15 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
TbServiceInfoProvider serviceInfoProvider,
TbAwsSqsSettings sqsSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRemoteJsInvokeSettings jsInvokeSettings) {
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.ruleEngineSettings = ruleEngineSettings;
this.sqsSettings = sqsSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
@ -84,17 +88,17 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -104,7 +108,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getTopic());
return new TbAwsSqsProducerTemplate<>(notificationAdmin, sqsSettings, coreSettings.getTopic());
}
@Override

View File

@ -32,6 +32,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin;
@ -51,34 +52,39 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
private final TbAwsSqsSettings sqsSettings;
private final TbQueueCoreSettings coreSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public AwsSqsTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbAwsSqsSettings sqsSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbAwsSqsQueueAttributes sqsQueueAttributes) {
TbAwsSqsQueueAttributes sqsQueueAttributes,
TbQueueRuleEngineSettings ruleEngineSettings) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.sqsSettings = sqsSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getCoreAttributes());
this.transportApiAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getTransportApiAttributes());
this.notificationAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getNotificationsAttributes());
this.ruleEngineAdmin = new TbAwsSqsAdmin(sqsSettings, sqsQueueAttributes.getRuleEngineAttributes());
}
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbAwsSqsProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
TbAwsSqsConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings,
transportApiSettings.getResponsesTopic() + "_" + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
@ -96,7 +102,7 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbAwsSqsProducerTemplate<>(transportApiAdmin, sqsSettings, transportApiSettings.getRequestsTopic());
return new TbAwsSqsProducerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic());
}
@Override
@ -126,5 +132,8 @@ public class AwsSqsTransportQueueFactory implements TbTransportQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}

View File

@ -131,7 +131,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -151,7 +151,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("monolith-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -328,5 +328,8 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -65,6 +66,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -80,6 +82,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbQueueTransportApiSettings transportApiSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
@ -89,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
this.transportApiSettings = transportApiSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -103,8 +107,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-transport-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -124,7 +128,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -144,7 +148,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-to-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -192,8 +196,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<TransportApiResponseMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-core-transport-api-producer-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportApiSettings.getResponsesTopic());
requestBuilder.admin(transportApiAdmin);
return requestBuilder.build();
}
@ -294,5 +298,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -46,6 +46,7 @@ import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -63,6 +64,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbKafkaConsumerStatsService consumerStatsService;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbQueueRuleEngineSettings ruleEngineSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbKafkaConsumerStatsService consumerStatsService,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.partitionService = partitionService;
this.kafkaSettings = kafkaSettings;
@ -85,6 +88,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
this.ruleEngineSettings = ruleEngineSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.consumerStatsService = consumerStatsService;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
@ -98,8 +102,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToTransportMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-transport-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(transportNotificationSettings.getNotificationsTopic());
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -108,8 +112,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-rule-engine-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
return requestBuilder.build();
}
@ -119,11 +123,10 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-rule-engine-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(ruleEngineSettings.getTopic());
requestBuilder.admin(ruleEngineAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
@ -150,7 +153,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
requestBuilder.settings(kafkaSettings);
requestBuilder.clientId("tb-rule-engine-to-core-notifications-" + serviceInfoProvider.getServiceId());
requestBuilder.defaultTopic(coreSettings.getTopic());
requestBuilder.admin(coreAdmin);
requestBuilder.admin(notificationAdmin);
return requestBuilder.build();
}
@ -239,5 +242,8 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (fwUpdatesAdmin != null) {
fwUpdatesAdmin.destroy();
}
}
}

View File

@ -112,7 +112,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
@ -122,7 +122,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override

View File

@ -46,7 +46,9 @@ import org.thingsboard.server.queue.pubsub.TbPubSubSettings;
import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -61,11 +63,14 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public PubSubTbCoreQueueFactory(TbPubSubSettings pubSubSettings,
TbQueueCoreSettings coreSettings,
@ -73,6 +78,8 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
@ -80,16 +87,19 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.jsExecutorAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getJsExecutorSettings());
this.transportApiAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getTransportApiSettings());
this.notificationAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getNotificationsSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -99,7 +109,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
@ -109,7 +119,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
@ -133,7 +143,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(transportApiAdmin, pubSubSettings, transportApiSettings.getResponsesTopic());
}
@Override
@ -195,5 +205,8 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.pubsub.TbPubSubSubscriptionSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -72,6 +74,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbPubSubSubscriptionSettings pubSubSubscriptionSettings) {
this.pubSubSettings = pubSubSettings;
this.coreSettings = coreSettings;
@ -79,6 +82,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getCoreSettings());
this.ruleEngineAdmin = new TbPubSubAdmin(pubSubSettings, pubSubSubscriptionSettings.getRuleEngineSettings());
@ -88,28 +92,27 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> createTbCoreMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getTopic());
return new TbPubSubProducerTemplate<>(notificationAdmin, pubSubSettings, coreSettings.getTopic());
}
@Override

View File

@ -66,13 +66,13 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
private final TbQueueTransportApiSettings transportApiSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
private final TbQueueAdmin jsExecutorAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
public RabbitMqMonolithQueueFactory(PartitionService partitionService, TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -89,13 +89,13 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
this.jsExecutorAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getJsExecutorArgs());
this.transportApiAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getTransportApiArgs());
this.notificationAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getNotificationsArgs());
this.jsInvokeSettings = jsInvokeSettings;
}
@Override
@ -110,7 +110,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -120,7 +120,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -63,6 +64,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbRabbitMqQueueArguments queueArguments) {
this.rabbitMqSettings = rabbitMqSettings;
this.coreSettings = coreSettings;
@ -85,6 +88,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -95,7 +99,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getResponsesTopic());
}
@Override

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbRabbitMqSettings rabbitMqSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -71,6 +73,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
TbServiceInfoProvider serviceInfoProvider,
TbRabbitMqSettings rabbitMqSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbRabbitMqQueueArguments queueArguments) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
@ -78,6 +81,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
this.ruleEngineSettings = ruleEngineSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -87,17 +91,17 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override
@ -107,7 +111,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getTopic());
return new TbRabbitMqProducerTemplate<>(notificationAdmin, rabbitMqSettings, coreSettings.getTopic());
}
@Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.rabbitmq.TbRabbitMqProducerTemplate;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqQueueArguments;
import org.thingsboard.server.queue.rabbitmq.TbRabbitMqSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
@ -51,6 +52,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
private final TbRabbitMqSettings rabbitMqSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -62,12 +64,14 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
TbRabbitMqSettings rabbitMqSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbRabbitMqQueueArguments queueArguments) {
this.transportApiSettings = transportApiSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.rabbitMqSettings = rabbitMqSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getCoreArgs());
this.ruleEngineAdmin = new TbRabbitMqAdmin(rabbitMqSettings, queueArguments.getRuleEngineArgs());
@ -77,10 +81,10 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
TbRabbitMqProducerTemplate<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
TbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
TbRabbitMqConsumerTemplate<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
TbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
new TbRabbitMqConsumerTemplate<>(transportApiAdmin, rabbitMqSettings,
transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId(),
msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportApiResponseMsg.parseFrom(msg.getData()), msg.getHeaders()));
@ -98,7 +102,7 @@ public class RabbitMqTransportQueueFactory implements TbTransportQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbRabbitMqProducerTemplate<>(transportApiAdmin, rabbitMqSettings, transportApiSettings.getRequestsTopic());
return new TbRabbitMqProducerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic());
}
@Override

View File

@ -109,7 +109,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -119,7 +119,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
@ -63,6 +64,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
private final PartitionService partitionService;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -77,6 +79,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.serviceBusSettings = serviceBusSettings;
this.coreSettings = coreSettings;
@ -85,6 +88,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
this.partitionService = partitionService;
this.serviceInfoProvider = serviceInfoProvider;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
@ -95,7 +99,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
@ -105,7 +109,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -115,7 +119,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override
@ -139,7 +143,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiResponseProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getResponsesTopic());
}
@Override

View File

@ -45,6 +45,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
import javax.annotation.PreDestroy;
@ -60,6 +61,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbServiceBusSettings serviceBusSettings;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
private final TbQueueTransportNotificationSettings transportNotificationSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin ruleEngineAdmin;
@ -71,6 +73,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
TbServiceInfoProvider serviceInfoProvider,
TbServiceBusSettings serviceBusSettings,
TbQueueRemoteJsInvokeSettings jsInvokeSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
this.partitionService = partitionService;
this.coreSettings = coreSettings;
@ -78,6 +81,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
this.ruleEngineSettings = ruleEngineSettings;
this.serviceBusSettings = serviceBusSettings;
this.jsInvokeSettings = jsInvokeSettings;
this.transportNotificationSettings = transportNotificationSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
@ -87,17 +91,17 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
@Override
public TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> createTransportNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, transportNotificationSettings.getNotificationsTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -107,7 +111,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getTopic());
return new TbServiceBusProducerTemplate<>(notificationAdmin, serviceBusSettings, coreSettings.getTopic());
}
@Override

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
@ -51,14 +52,17 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
private final TbServiceBusSettings serviceBusSettings;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final TbQueueAdmin coreAdmin;
private final TbQueueAdmin transportApiAdmin;
private final TbQueueAdmin notificationAdmin;
private final TbQueueAdmin ruleEngineAdmin;
public ServiceBusTransportQueueFactory(TbQueueTransportApiSettings transportApiSettings,
TbQueueTransportNotificationSettings transportNotificationSettings,
TbServiceBusSettings serviceBusSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbServiceBusQueueConfigs serviceBusQueueConfigs) {
@ -67,10 +71,12 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
this.serviceBusSettings = serviceBusSettings;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
this.coreAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getCoreConfigs());
this.transportApiAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getTransportApiConfigs());
this.notificationAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getNotificationsConfigs());
this.ruleEngineAdmin = new TbServiceBusAdmin(serviceBusSettings, serviceBusQueueConfigs.getRuleEngineConfigs());
}
@Override
@ -96,7 +102,7 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> createRuleEngineMsgProducer() {
return new TbServiceBusProducerTemplate<>(transportApiAdmin, serviceBusSettings, transportApiSettings.getRequestsTopic());
return new TbServiceBusProducerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic());
}
@Override
@ -127,5 +133,8 @@ public class ServiceBusTransportQueueFactory implements TbTransportQueueFactory
if (notificationAdmin != null) {
notificationAdmin.destroy();
}
if (ruleEngineAdmin != null) {
ruleEngineAdmin.destroy();
}
}
}

View File

@ -36,6 +36,9 @@ import java.util.stream.Collectors;
public abstract class DaoUtil {
public static final String DEFAULT_SORT_PROPERTY = "id";
public static final Sort DEFAULT_SORT = Sort.by(Sort.Direction.ASC, DEFAULT_SORT_PROPERTY);
private DaoUtil() {
}
@ -70,7 +73,7 @@ public abstract class DaoUtil {
public static Sort toSort(SortOrder sortOrder, Map<String,String> columnMap) {
if (sortOrder == null) {
return Sort.unsorted();
return DEFAULT_SORT;
} else {
String property = sortOrder.getProperty();
if (columnMap.containsKey(property)) {

View File

@ -336,12 +336,14 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
return savedDevice;
}
@Transactional
@Override
public void deleteDevice(TenantId tenantId, DeviceId deviceId) {
public void deleteDevice(final TenantId tenantId, final DeviceId deviceId) {
log.trace("Executing deleteDevice [{}]", deviceId);
validateId(deviceId, INCORRECT_DEVICE_ID + deviceId);
Device device = deviceDao.findById(tenantId, deviceId.getId());
final String deviceName = device.getName();
try {
List<EntityView> entityViews = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(device.getTenantId(), deviceId).get();
if (entityViews != null && !entityViews.isEmpty()) {
@ -358,10 +360,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
}
deleteEntityRelations(tenantId, deviceId);
removeDeviceFromCacheByName(tenantId, device.getName());
removeDeviceFromCacheById(tenantId, device.getId());
deviceDao.removeById(tenantId, deviceId.getId());
removeDeviceFromCacheByName(tenantId, deviceName);
removeDeviceFromCacheById(tenantId, deviceId);
}
private void removeDeviceFromCacheByName(TenantId tenantId, String name) {

View File

@ -44,6 +44,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE;
public abstract class BaseOtaPackageServiceTest extends AbstractServiceTest {
@ -58,7 +59,7 @@ public abstract class BaseOtaPackageServiceTest extends AbstractServiceTest {
private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[]{(int) DATA_SIZE});
private static final String URL = "http://firmware.test.org";
private IdComparator<OtaPackageInfo> idComparator = new IdComparator<>();
private final IdComparator<OtaPackageInfo> idComparator = new IdComparator<>();
private TenantId tenantId;
@ -565,7 +566,7 @@ public abstract class BaseOtaPackageServiceTest extends AbstractServiceTest {
Collections.sort(firmwares, idComparator);
Collections.sort(loadedFirmwares, idComparator);
Assert.assertEquals(firmwares, loadedFirmwares);
assertThat(firmwares).isEqualTo(loadedFirmwares);
otaPackageService.deleteOtaPackagesByTenantId(tenantId);
@ -620,7 +621,7 @@ public abstract class BaseOtaPackageServiceTest extends AbstractServiceTest {
Collections.sort(firmwares, idComparator);
Collections.sort(loadedFirmwares, idComparator);
Assert.assertEquals(firmwares, loadedFirmwares);
assertThat(firmwares).isEqualTo(loadedFirmwares);
otaPackageService.deleteOtaPackagesByTenantId(tenantId);

View File

@ -47,7 +47,7 @@
<jedis.version>3.3.0</jedis.version>
<jjwt.version>0.7.0</jjwt.version>
<slf4j.version>1.7.32</slf4j.version>
<log4j.version>2.16.0</log4j.version>
<log4j.version>2.17.0</log4j.version>
<logback.version>1.2.6</logback.version>
<rat.version>0.10</rat.version>
<cassandra.version>4.10.0</cassandra.version>

View File

@ -69,6 +69,8 @@ export default abstract class LeafletMap {
loading = false;
replaceInfoLabelMarker: Array<ReplaceInfo> = [];
markerLabelText: string;
polygonLabelText: string;
replaceInfoLabelPolygon: Array<ReplaceInfo> = [];
replaceInfoTooltipMarker: Array<ReplaceInfo> = [];
markerTooltipText: string;
drawRoutes: boolean;
@ -248,8 +250,16 @@ export default abstract class LeafletMap {
this.saveLocation(this.selectedEntity, this.convertToCustomFormat(e.layer.getLatLng())).subscribe(() => {
});
} else if (e.shape === 'tbRectangle' || e.shape === 'tbPolygon') {
// @ts-ignore
this.saveLocation(this.selectedEntity, this.convertPolygonToCustomFormat(e.layer.getLatLngs()[0])).subscribe(() => {
let coordinates;
if (e.shape === 'tbRectangle') {
// @ts-ignore
const bounds: L.LatLngBounds = e.layer.getBounds();
coordinates = [bounds.getNorthWest(), bounds.getSouthEast()];
} else {
// @ts-ignore
coordinates = e.layer.getLatLngs()[0];
}
this.saveLocation(this.selectedEntity, this.convertPolygonToCustomFormat(coordinates)).subscribe(() => {
});
}
// @ts-ignore
@ -281,7 +291,7 @@ export default abstract class LeafletMap {
result = iterator.next();
}
this.saveLocation(result.value.data, this.convertToCustomFormat(null)).subscribe(() => {});
} else if (e.shape === 'Polygon') {
} else if (e.shape === 'Polygon' || e.shape === 'Rectangle') {
const iterator = this.polygons.values();
let result = iterator.next();
while (!result.done && e.layer !== result.value.leafletPoly) {
@ -333,6 +343,7 @@ export default abstract class LeafletMap {
this.map.scrollWheelZoom.disable();
}
if (this.options.draggableMarker || this.editPolygons) {
map.pm.setGlobalOptions({ snappable: false } as L.PM.GlobalOptions);
this.addEditControl();
} else {
this.map.pm.disableDraw();
@ -764,6 +775,14 @@ export default abstract class LeafletMap {
if (coordinates.length === 1) {
coordinates = coordinates[0];
}
if (e.shape === 'Rectangle' && coordinates.length === 1) {
// @ts-ignore
const bounds: L.LatLngBounds = e.layer.getBounds();
const boundsArray = [bounds.getNorthWest(), bounds.getNorthEast(), bounds.getSouthWest(), bounds.getSouthEast()];
if (coordinates.every(point => boundsArray.find(boundPoint => boundPoint.equals(point)) !== undefined)) {
coordinates = [bounds.getNorthWest(), bounds.getSouthEast()];
}
}
this.saveLocation(data, this.convertPolygonToCustomFormat(coordinates)).subscribe(() => {});
}

View File

@ -104,6 +104,8 @@ export type MarkerSettings = {
markerImageFunction?: MarkerImageFunction;
markerOffsetX: number;
markerOffsetY: number;
tooltipOffsetX: number;
tooltipOffsetY: number;
};
export interface FormattedData {
@ -131,6 +133,11 @@ export type PolygonSettings = {
polygonStrokeWeight: number;
polygonStrokeColor: string;
polygonColor: string;
showPolygonLabel?: boolean;
polygonLabel: string;
polygonLabelColor: string;
polygonLabelText: string;
usePolygonLabelFunction: boolean;
showPolygonTooltip: boolean;
autocloseTooltip: boolean;
showTooltipAction: string;
@ -139,8 +146,11 @@ export type PolygonSettings = {
usePolygonTooltipFunction: boolean;
polygonClick: { [name: string]: actionsHandler };
usePolygonColorFunction: boolean;
usePolygonStrokeColorFunction: boolean;
polygonTooltipFunction: GenericFunction;
polygonColorFunction?: GenericFunction;
polygonStrokeColorFunction?: GenericFunction;
polygonLabelFunction?: GenericFunction;
editablePolygon: boolean;
};
@ -215,6 +225,8 @@ export const defaultSettings: any = {
yPosKeyName: 'yPos',
markerOffsetX: 0.5,
markerOffsetY: 1,
tooltipOffsetX: 0,
tooltipOffsetY: -1,
latKeyName: 'latitude',
lngKeyName: 'longitude',
polygonKeyName: 'coordinates',
@ -227,8 +239,10 @@ export const defaultSettings: any = {
showPolygon: false,
labelColor: '#000000',
color: '#FE7569',
showPolygonLabel: false,
polygonColor: '#0000ff',
polygonStrokeColor: '#fe0001',
polygonLabelColor: '#000000',
polygonOpacity: 0.5,
polygonStrokeOpacity: 1,
polygonStrokeWeight: 1,

View File

@ -265,10 +265,13 @@ export class MapWidgetController implements MapWidgetInterface {
tooltipFunction: parseFunction(settings.tooltipFunction, functionParams),
colorFunction: parseFunction(settings.colorFunction, functionParams),
colorPointFunction: parseFunction(settings.colorPointFunction, functionParams),
polygonLabelFunction: parseFunction(settings.polygonLabelFunction, functionParams),
polygonColorFunction: parseFunction(settings.polygonColorFunction, functionParams),
polygonStrokeColorFunction: parseFunction(settings.polygonStrokeColorFunction, functionParams),
polygonTooltipFunction: parseFunction(settings.polygonTooltipFunction, functionParams),
markerImageFunction: parseFunction(settings.markerImageFunction, ['data', 'images', 'dsData', 'dsIndex']),
labelColor: this.ctx.widgetConfig.color,
polygonLabelColor: this.ctx.widgetConfig.color,
polygonKeyName: settings.polKeyName ? settings.polKeyName : settings.polygonKeyName,
tooltipPattern: settings.tooltipPattern ||
'<b>${entityName}</b><br/><br/><b>Latitude:</b> ${' +

View File

@ -56,3 +56,10 @@ export function bindPopupActions(popup: L.Popup, settings: MarkerSettings | Poly
}
});
}
export function isCutPolygon(data): boolean {
if (Array.isArray(data[0]) && Array.isArray(data[0][0])) {
return true;
}
return false;
}

View File

@ -29,6 +29,15 @@
box-shadow: none;
}
.tb-polygon-label {
border: none;
background: none;
box-shadow: none;
&:before {
content: none;
}
}
.leaflet-container {
background-color: white;
}

View File

@ -33,6 +33,7 @@ import LeafletMap from './leaflet-map';
export class Marker {
leafletMarker: L.Marker;
labelOffset: L.LatLngTuple;
tooltipOffset: L.LatLngTuple;
markerOffset: L.LatLngTuple;
tooltip: L.Popup;
@ -49,9 +50,14 @@ export class Marker {
isDefined(settings.markerOffsetY) ? settings.markerOffsetY : 1,
];
this.tooltipOffset = [
isDefined(settings.tooltipOffsetX) ? settings.tooltipOffsetX : 0,
isDefined(settings.tooltipOffsetY) ? settings.tooltipOffsetY : -1,
];
this.createMarkerIcon((iconInfo) => {
this.leafletMarker.setIcon(iconInfo.icon);
this.tooltipOffset = [0, -iconInfo.size[1] * this.markerOffset[1] + 10];
this.labelOffset = [0, -iconInfo.size[1] * this.markerOffset[1] + 10];
this.updateMarkerLabel(settings);
});
@ -111,7 +117,7 @@ export class Marker {
}
settings.labelText = fillPattern(this.map.markerLabelText, this.map.replaceInfoLabelMarker, this.data);
this.leafletMarker.bindTooltip(`<div style="color: ${settings.labelColor};"><b>${settings.labelText}</b></div>`,
{ className: 'tb-marker-label', permanent: true, direction: 'top', offset: this.tooltipOffset });
{ className: 'tb-marker-label', permanent: true, direction: 'top', offset: this.labelOffset });
}
}
@ -124,7 +130,7 @@ export class Marker {
updateMarkerIcon(settings: MarkerSettings) {
this.createMarkerIcon((iconInfo) => {
this.leafletMarker.setIcon(iconInfo.icon);
this.tooltipOffset = [0, -iconInfo.size[1] * this.markerOffset[1] + 10];
this.labelOffset = [0, -iconInfo.size[1] * this.markerOffset[1] + 10];
this.updateMarkerLabel(settings);
});
}
@ -165,7 +171,7 @@ export class Marker {
iconUrl: currentImage.url,
iconSize: [width, height],
iconAnchor: [width * this.markerOffset[0], height * this.markerOffset[1]],
popupAnchor: [0, -height]
popupAnchor: [width * this.tooltipOffset[0], height * this.tooltipOffset[1]]
});
const iconInfo = {
size: [width, height],

View File

@ -15,9 +15,15 @@
///
import L, { LatLngExpression, LeafletMouseEvent } from 'leaflet';
import { createTooltip } from './maps-utils';
import { functionValueCalculator, parseWithTranslation, safeExecute } from './common-maps-utils';
import { FormattedData, PolygonSettings } from './map-models';
import { createTooltip, isCutPolygon } from './maps-utils';
import {
fillPattern,
functionValueCalculator,
parseWithTranslation,
processPattern,
safeExecute
} from './common-maps-utils';
import { FormattedData, MarkerSettings, PolygonSettings } from './map-models';
export class Polygon {
@ -26,38 +32,47 @@ export class Polygon {
data: FormattedData;
dataSources: FormattedData[];
constructor(public map, polyData: FormattedData, dataSources: FormattedData[], private settings: PolygonSettings, onDragendListener?) {
constructor(public map, data: FormattedData, dataSources: FormattedData[], private settings: PolygonSettings,
private onDragendListener?) {
this.dataSources = dataSources;
this.data = polyData;
this.data = data;
const polygonColor = this.getPolygonColor(settings);
this.leafletPoly = L.polygon(polyData[this.settings.polygonKeyName], {
const polygonStrokeColor = this.getPolygonStrokeColor(settings);
const polyData = data[this.settings.polygonKeyName];
const polyConstructor = isCutPolygon(polyData) || polyData.length > 2 ? L.polygon : L.rectangle;
this.leafletPoly = polyConstructor(polyData, {
fill: true,
fillColor: polygonColor,
color: settings.polygonStrokeColor,
color: polygonStrokeColor,
weight: settings.polygonStrokeWeight,
fillOpacity: settings.polygonOpacity,
opacity: settings.polygonStrokeOpacity,
pmIgnore: !settings.editablePolygon
}).addTo(this.map);
if (settings.editablePolygon && onDragendListener) {
this.leafletPoly.on('pm:edit', (e) => onDragendListener(e, this.data));
}
this.updateLabel(settings);
if (settings.showPolygonTooltip) {
this.tooltip = createTooltip(this.leafletPoly, settings, polyData.$datasource);
this.updateTooltip(polyData);
}
if (settings.polygonClick) {
this.leafletPoly.on('click', (event: LeafletMouseEvent) => {
for (const action in this.settings.polygonClick) {
if (typeof (this.settings.polygonClick[action]) === 'function') {
this.settings.polygonClick[action](event.originalEvent, polyData.$datasource);
}
}
});
this.tooltip = createTooltip(this.leafletPoly, settings, data.$datasource);
this.updateTooltip(data);
}
this.createEventListeners();
}
private createEventListeners() {
if (this.settings.editablePolygon && this.onDragendListener) {
this.leafletPoly.on('pm:edit', (e) => this.onDragendListener(e, this.data));
}
if (this.settings.polygonClick) {
this.leafletPoly.on('click', (event: LeafletMouseEvent) => {
for (const action in this.settings.polygonClick) {
if (typeof (this.settings.polygonClick[action]) === 'function') {
this.settings.polygonClick[action](event.originalEvent, this.data.$datasource);
}
}
});
}
}
updateTooltip(data: FormattedData) {
@ -67,13 +82,54 @@ export class Polygon {
this.tooltip.setContent(parseWithTranslation.parseTemplate(pattern, data, true));
}
updateLabel(settings: PolygonSettings) {
this.leafletPoly.unbindTooltip();
if (settings.showPolygonLabel) {
if (!this.map.polygonLabelText || settings.usePolygonLabelFunction) {
const pattern = settings.usePolygonLabelFunction ?
safeExecute(settings.polygonLabelFunction, [this.data, this.dataSources, this.data.dsIndex]) : settings.polygonLabel;
this.map.polygonLabelText = parseWithTranslation.prepareProcessPattern(pattern, true);
this.map.replaceInfoLabelPolygon = processPattern(this.map.polygonLabelText, this.data);
}
settings.polygonLabelText = fillPattern(this.map.polygonLabelText, this.map.replaceInfoLabelPolygon, this.data);
this.leafletPoly.bindTooltip(`<div style="color: ${settings.polygonLabelColor};"><b>${settings.polygonLabelText}</b></div>`,
{ className: 'tb-polygon-label', permanent: true, sticky: true, direction: 'center' })
.openTooltip(this.leafletPoly.getBounds().getCenter());
}
}
updatePolygon(data: FormattedData, dataSources: FormattedData[], settings: PolygonSettings) {
this.data = data;
this.dataSources = dataSources;
this.leafletPoly.setLatLngs(data[this.settings.polygonKeyName]);
const polyData = data[this.settings.polygonKeyName];
if (isCutPolygon(polyData) || polyData.length > 2) {
if (this.leafletPoly instanceof L.Rectangle) {
this.map.removeLayer(this.leafletPoly);
const polygonColor = this.getPolygonColor(settings);
const polygonStrokeColor = this.getPolygonStrokeColor(settings);
this.leafletPoly = L.polygon(polyData, {
fill: true,
fillColor: polygonColor,
color: polygonStrokeColor,
weight: settings.polygonStrokeWeight,
fillOpacity: settings.polygonOpacity,
opacity: settings.polygonStrokeOpacity,
pmIgnore: !settings.editablePolygon
}).addTo(this.map);
} else {
this.leafletPoly.setLatLngs(polyData);
}
} else if (polyData.length === 2) {
const bounds = new L.LatLngBounds(polyData);
// @ts-ignore
this.leafletPoly.setBounds(bounds);
}
if (settings.showPolygonTooltip) {
this.updateTooltip(this.data);
}
if (settings.showPolygonLabel) {
this.updateLabel(settings);
}
this.updatePolygonColor(settings);
}
@ -83,10 +139,11 @@ export class Polygon {
updatePolygonColor(settings: PolygonSettings) {
const polygonColor = this.getPolygonColor(settings);
const polygonStrokeColor = this.getPolygonStrokeColor(settings);
const style: L.PathOptions = {
fill: true,
fillColor: polygonColor,
color: settings.polygonStrokeColor,
color: polygonStrokeColor,
weight: settings.polygonStrokeWeight,
fillOpacity: settings.polygonOpacity,
opacity: settings.polygonStrokeOpacity
@ -107,4 +164,9 @@ export class Polygon {
return functionValueCalculator(settings.usePolygonColorFunction, settings.polygonColorFunction,
[this.data, this.dataSources, this.data.dsIndex], settings.polygonColor);
}
private getPolygonStrokeColor(settings: PolygonSettings): string | null {
return functionValueCalculator(settings.usePolygonStrokeColorFunction, settings.polygonStrokeColorFunction,
[this.data, this.dataSources, this.data.dsIndex], settings.polygonStrokeColor);
}
}

View File

@ -343,15 +343,25 @@ export const commonMapSettingsSchema =
default: 'return {x: origXPos, y: origYPos};'
},
markerOffsetX: {
title: 'Marker X offset relative to position',
title: 'Marker X offset relative to position multiplied by marker width',
type: 'number',
default: 0.5
},
markerOffsetY: {
title: 'Marker Y offset relative to position',
title: 'Marker Y offset relative to position multiplied by marker height',
type: 'number',
default: 1
},
tooltipOffsetX: {
title: 'Tooltip X offset relative to marker anchor multiplied by marker width',
type: 'number',
default: 0
},
tooltipOffsetY: {
title: 'Tooltip Y offset relative to marker anchor multiplied by marker height',
type: 'number',
default: -1
},
color: {
title: 'Color',
type: 'string'
@ -482,13 +492,15 @@ export const commonMapSettingsSchema =
condition: 'model.showTooltip === true && model.useTooltipFunction === true'
},
{
key: 'markerOffsetX',
condition: 'model.provider === "image-map"'
key: 'tooltipOffsetX',
condition: 'model.showTooltip === true'
},
{
key: 'markerOffsetY',
condition: 'model.provider === "image-map"'
key: 'tooltipOffsetY',
condition: 'model.showTooltip === true'
},
'markerOffsetX',
'markerOffsetY',
{
key: 'posFunction',
type: 'javascript',
@ -556,6 +568,25 @@ export const mapPolygonSchema =
type: 'boolean',
default: false
},
showPolygonLabel: {
title: 'Show polygon label',
type: 'boolean',
default: false
},
polygonLabel: {
title: 'Polygon label (pattern examples: \'${entityName}\', \'${entityName}: (Text ${keyName} units.)\' )',
type: 'string',
default: '${entityName}'
},
usePolygonLabelFunction: {
title: 'Use polygon label function',
type: 'boolean',
default: false
},
polygonLabelFunction: {
title: 'Polygon label function: f(data, dsData, dsIndex)',
type: 'string'
},
polygonColor: {
title: 'Polygon color',
type: 'string'
@ -607,6 +638,15 @@ export const mapPolygonSchema =
title: 'Polygon Color function: f(data, dsData, dsIndex)',
type: 'string'
},
usePolygonStrokeColorFunction: {
title: 'Use polygon stroke color function',
type: 'boolean',
default: false
},
polygonStrokeColorFunction: {
title: 'Polygon Stroke Color function: f(data, dsData, dsIndex)',
type: 'string'
}
},
required: []
},
@ -614,6 +654,21 @@ export const mapPolygonSchema =
'showPolygon',
'polygonKeyName',
'editablePolygon',
'showPolygonLabel',
{
key: 'usePolygonLabelFunction',
condition: 'model.showPolygonLabel === true'
},
{
key: 'polygonLabel',
condition: 'model.showPolygonLabel === true && model.usePolygonLabelFunction !== true'
},
{
key: 'polygonLabelFunction',
type: 'javascript',
helpId: 'widget/lib/map/label_fn',
condition: 'model.showPolygonLabel === true && model.usePolygonLabelFunction === true'
},
{
key: 'polygonColor',
type: 'color'
@ -630,6 +685,13 @@ export const mapPolygonSchema =
key: 'polygonStrokeColor',
type: 'color'
},
'usePolygonStrokeColorFunction',
{
key: 'polygonStrokeColorFunction',
helpId: 'widget/lib/map/polygon_color_fn',
type: 'javascript',
condition: 'model.usePolygonStrokeColorFunction === true'
},
'polygonStrokeOpacity',
'polygonStrokeWeight',
'showPolygonTooltip',

View File

@ -67,10 +67,10 @@ export default ThingsboardBaseComponent => class<P extends JsonFormFieldProps>
defaultValue() {
let value = JsonFormUtils.selectOrSet(this.props.form.key, this.props.model);
if (this.props.form.schema.type === 'boolean') {
if (typeof value !== 'boolean' && this.props.form.default) {
if (typeof value !== 'boolean' && typeof this.props.form.default === 'boolean') {
value = this.props.form.default;
}
if (typeof value !== 'boolean' && this.props.form.schema && this.props.form.schema.default) {
if (typeof value !== 'boolean' && this.props.form.schema && typeof this.props.form.schema.default === 'boolean') {
value = this.props.form.schema.default;
}
if (typeof value !== 'boolean' &&
@ -79,13 +79,13 @@ export default ThingsboardBaseComponent => class<P extends JsonFormFieldProps>
value = false;
}
} else if (this.props.form.schema.type === 'integer' || this.props.form.schema.type === 'number') {
if (typeof value !== 'number' && this.props.form.default) {
if (typeof value !== 'number' && typeof this.props.form.default === 'number') {
value = this.props.form.default;
}
if (typeof value !== 'number' && this.props.form.schema && this.props.form.schema.default) {
if (typeof value !== 'number' && this.props.form.schema && typeof this.props.form.schema.default === 'number') {
value = this.props.form.schema.default;
}
if (typeof value !== 'number' && this.props.form.titleMap && this.props.form.titleMap[0].value) {
if (typeof value !== 'number' && this.props.form.titleMap && typeof this.props.form.titleMap[0].value === 'number') {
value = this.props.form.titleMap[0].value;
}
if (value && typeof value === 'string') {
@ -96,13 +96,13 @@ export default ThingsboardBaseComponent => class<P extends JsonFormFieldProps>
}
}
} else {
if (!value && this.props.form.default) {
if (!value && typeof this.props.form.default !== 'undefined') {
value = this.props.form.default;
}
if (!value && this.props.form.schema && this.props.form.schema.default) {
if (!value && this.props.form.schema && typeof this.props.form.schema.default !== 'undefined') {
value = this.props.form.schema.default;
}
if (!value && this.props.form.titleMap && this.props.form.titleMap[0].value) {
if (!value && this.props.form.titleMap && typeof this.props.form.titleMap[0].value !== 'undefined') {
value = this.props.form.titleMap[0].value;
}
}