Merge branch 'develop/3.4' into blackbox-tests-wait-for-js-executor-kafka-message
This commit is contained in:
commit
f25b47e8c7
@ -381,8 +381,8 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) {
|
protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) {
|
||||||
ArgumentMatcher<TenantId> matcherTenantIdId = cntTime == 1 ? argument -> argument.equals(tenantId) :
|
ArgumentMatcher<TenantId> matcherTenantIdId = cntTime > 1 || tenantId == null ? argument -> argument.getClass().equals(TenantId.class) :
|
||||||
argument -> argument.getClass().equals(TenantId.class);
|
argument -> argument.equals(tenantId) ;
|
||||||
Mockito.verify(tbClusterService, times(cntTime)).broadcastEntityStateChangeEvent(Mockito.argThat(matcherTenantIdId),
|
Mockito.verify(tbClusterService, times(cntTime)).broadcastEntityStateChangeEvent(Mockito.argThat(matcherTenantIdId),
|
||||||
Mockito.any(entityId.getClass()), Mockito.any(ComponentLifecycleEvent.class));
|
Mockito.any(entityId.getClass()), Mockito.any(ComponentLifecycleEvent.class));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -93,7 +93,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
Tenant tenant = new Tenant();
|
Tenant tenant = new Tenant();
|
||||||
tenant.setTitle("My tenant");
|
tenant.setTitle("My tenant");
|
||||||
|
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class);
|
Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class);
|
||||||
Assert.assertNotNull(savedTenant);
|
Assert.assertNotNull(savedTenant);
|
||||||
@ -122,7 +122,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
Tenant tenant = new Tenant();
|
Tenant tenant = new Tenant();
|
||||||
tenant.setTitle(RandomStringUtils.randomAlphanumeric(300));
|
tenant.setTitle(RandomStringUtils.randomAlphanumeric(300));
|
||||||
|
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
doPost("/api/tenant", tenant)
|
doPost("/api/tenant", tenant)
|
||||||
.andExpect(status().isBadRequest())
|
.andExpect(status().isBadRequest())
|
||||||
@ -161,7 +161,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
public void testSaveTenantWithEmptyTitle() throws Exception {
|
public void testSaveTenantWithEmptyTitle() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
Tenant tenant = new Tenant();
|
Tenant tenant = new Tenant();
|
||||||
doPost("/api/tenant", tenant)
|
doPost("/api/tenant", tenant)
|
||||||
@ -175,7 +175,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
public void testSaveTenantWithInvalidEmail() throws Exception {
|
public void testSaveTenantWithInvalidEmail() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
Tenant tenant = new Tenant();
|
Tenant tenant = new Tenant();
|
||||||
tenant.setTitle("My tenant");
|
tenant.setTitle("My tenant");
|
||||||
@ -212,7 +212,7 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
Assert.assertEquals(1, pageData.getData().size());
|
Assert.assertEquals(1, pageData.getData().size());
|
||||||
tenants.addAll(pageData.getData());
|
tenants.addAll(pageData.getData());
|
||||||
|
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
int cntEntity = 56;
|
int cntEntity = 56;
|
||||||
List<ListenableFuture<Tenant>> createFutures = new ArrayList<>(56);
|
List<ListenableFuture<Tenant>> createFutures = new ArrayList<>(56);
|
||||||
@ -578,13 +578,13 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest {
|
|||||||
}
|
}
|
||||||
TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant);
|
TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant);
|
||||||
testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime);
|
testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime);
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testBroadcastEntityStateChangeEventNeverTenant() {
|
private void testBroadcastEntityStateChangeEventNeverTenant() {
|
||||||
Mockito.verify(tbClusterService, never()).onTenantChange(Mockito.any(Tenant.class),
|
Mockito.verify(tbClusterService, never()).onTenantChange(Mockito.any(Tenant.class),
|
||||||
Mockito.isNull());
|
Mockito.isNull());
|
||||||
testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant()));
|
testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant()));
|
||||||
Mockito.reset(tbClusterService, auditLogService);
|
Mockito.reset(tbClusterService);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,24 +17,24 @@ package org.thingsboard.server.controller;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.mockito.ArgumentMatcher;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.thingsboard.server.common.data.EntityInfo;
|
import org.thingsboard.server.common.data.EntityInfo;
|
||||||
import org.thingsboard.server.common.data.Tenant;
|
import org.thingsboard.server.common.data.Tenant;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantProfileId;
|
||||||
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
|
import org.thingsboard.server.common.data.page.PageLink;
|
||||||
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||||
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
||||||
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
|
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
|
||||||
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
||||||
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
|
||||||
import org.thingsboard.server.common.data.page.PageLink;
|
|
||||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
|
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
|
||||||
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -42,6 +42,8 @@ import java.util.List;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
|
||||||
|
|
||||||
public abstract class BaseTenantProfileControllerTest extends AbstractControllerTest {
|
public abstract class BaseTenantProfileControllerTest extends AbstractControllerTest {
|
||||||
@ -52,6 +54,9 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
@Test
|
@Test
|
||||||
public void testSaveTenantProfile() throws Exception {
|
public void testSaveTenantProfile() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
||||||
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||||
Assert.assertNotNull(savedTenantProfile);
|
Assert.assertNotNull(savedTenantProfile);
|
||||||
@ -64,17 +69,28 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
Assert.assertEquals(tenantProfile.isIsolatedTbCore(), savedTenantProfile.isIsolatedTbCore());
|
Assert.assertEquals(tenantProfile.isIsolatedTbCore(), savedTenantProfile.isIsolatedTbCore());
|
||||||
Assert.assertEquals(tenantProfile.isIsolatedTbRuleEngine(), savedTenantProfile.isIsolatedTbRuleEngine());
|
Assert.assertEquals(tenantProfile.isIsolatedTbRuleEngine(), savedTenantProfile.isIsolatedTbRuleEngine());
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(savedTenantProfile, ComponentLifecycleEvent.CREATED, 1);
|
||||||
|
|
||||||
savedTenantProfile.setName("New tenant profile");
|
savedTenantProfile.setName("New tenant profile");
|
||||||
doPost("/api/tenantProfile", savedTenantProfile, TenantProfile.class);
|
doPost("/api/tenantProfile", savedTenantProfile, TenantProfile.class);
|
||||||
TenantProfile foundTenantProfile = doGet("/api/tenantProfile/"+savedTenantProfile.getId().getId().toString(), TenantProfile.class);
|
TenantProfile foundTenantProfile = doGet("/api/tenantProfile/"+savedTenantProfile.getId().getId().toString(), TenantProfile.class);
|
||||||
Assert.assertEquals(foundTenantProfile.getName(), savedTenantProfile.getName());
|
Assert.assertEquals(foundTenantProfile.getName(), savedTenantProfile.getName());
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(savedTenantProfile, ComponentLifecycleEvent.UPDATED, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSaveTenantProfileWithViolationOfLengthValidation() throws Exception {
|
public void testSaveTenantProfileWithViolationOfLengthValidation() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
TenantProfile tenantProfile = this.createTenantProfile(RandomStringUtils.randomAlphabetic(300));
|
TenantProfile tenantProfile = this.createTenantProfile(RandomStringUtils.randomAlphabetic(300));
|
||||||
doPost("/api/tenantProfile", tenantProfile).andExpect(statusReason(containsString("length of name must be equal or less than 255")));
|
doPost("/api/tenantProfile", tenantProfile)
|
||||||
|
.andExpect(status().isBadRequest())
|
||||||
|
.andExpect(statusReason(containsString(msgErrorFieldLength("name"))));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -122,9 +138,15 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
@Test
|
@Test
|
||||||
public void testSaveTenantProfileWithEmptyName() throws Exception {
|
public void testSaveTenantProfileWithEmptyName() throws Exception {
|
||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
TenantProfile tenantProfile = new TenantProfile();
|
TenantProfile tenantProfile = new TenantProfile();
|
||||||
doPost("/api/tenantProfile", tenantProfile).andExpect(status().isBadRequest())
|
doPost("/api/tenantProfile", tenantProfile).andExpect(status().isBadRequest())
|
||||||
.andExpect(statusReason(containsString("Tenant profile name should be specified")));
|
.andExpect(status().isBadRequest())
|
||||||
|
.andExpect(statusReason(containsString("Tenant profile name " + msgErrorShouldBeSpecified)));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -132,9 +154,15 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
||||||
doPost("/api/tenantProfile", tenantProfile).andExpect(status().isOk());
|
doPost("/api/tenantProfile", tenantProfile).andExpect(status().isOk());
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
TenantProfile tenantProfile2 = this.createTenantProfile("Tenant Profile");
|
TenantProfile tenantProfile2 = this.createTenantProfile("Tenant Profile");
|
||||||
doPost("/api/tenantProfile", tenantProfile2).andExpect(status().isBadRequest())
|
doPost("/api/tenantProfile", tenantProfile2)
|
||||||
|
.andExpect(status().isBadRequest())
|
||||||
.andExpect(statusReason(containsString("Tenant profile with such name already exists")));
|
.andExpect(statusReason(containsString("Tenant profile with such name already exists")));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -144,8 +172,14 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||||
savedTenantProfile.setIsolatedTbRuleEngine(true);
|
savedTenantProfile.setIsolatedTbRuleEngine(true);
|
||||||
addMainQueueConfig(savedTenantProfile);
|
addMainQueueConfig(savedTenantProfile);
|
||||||
doPost("/api/tenantProfile", savedTenantProfile).andExpect(status().isBadRequest())
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
|
doPost("/api/tenantProfile", savedTenantProfile)
|
||||||
|
.andExpect(status().isBadRequest())
|
||||||
.andExpect(statusReason(containsString("Can't update isolatedTbRuleEngine property")));
|
.andExpect(statusReason(containsString("Can't update isolatedTbRuleEngine property")));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -153,9 +187,15 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
loginSysAdmin();
|
loginSysAdmin();
|
||||||
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
||||||
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
savedTenantProfile.setIsolatedTbCore(true);
|
savedTenantProfile.setIsolatedTbCore(true);
|
||||||
doPost("/api/tenantProfile", savedTenantProfile).andExpect(status().isBadRequest())
|
doPost("/api/tenantProfile", savedTenantProfile)
|
||||||
|
.andExpect(status().isBadRequest())
|
||||||
.andExpect(statusReason(containsString("Can't update isolatedTbCore property")));
|
.andExpect(statusReason(containsString("Can't update isolatedTbCore property")));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -169,10 +209,14 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
tenant.setTenantProfileId(savedTenantProfile.getId());
|
tenant.setTenantProfileId(savedTenantProfile.getId());
|
||||||
Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class);
|
Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class);
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
doDelete("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
doDelete("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
||||||
.andExpect(status().isBadRequest())
|
.andExpect(status().isBadRequest())
|
||||||
.andExpect(statusReason(containsString("The tenant profile referenced by the tenants cannot be deleted")));
|
.andExpect(statusReason(containsString("The tenant profile referenced by the tenants cannot be deleted")));
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventNeverTenantProfile();
|
||||||
|
|
||||||
doDelete("/api/tenant/"+savedTenant.getId().getId().toString())
|
doDelete("/api/tenant/"+savedTenant.getId().getId().toString())
|
||||||
.andExpect(status().isOk());
|
.andExpect(status().isOk());
|
||||||
}
|
}
|
||||||
@ -183,11 +227,16 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
|
||||||
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
doDelete("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
doDelete("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
||||||
.andExpect(status().isOk());
|
.andExpect(status().isOk());
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(savedTenantProfile, ComponentLifecycleEvent.DELETED, 1);
|
||||||
|
|
||||||
doGet("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
doGet("/api/tenantProfile/" + savedTenantProfile.getId().getId().toString())
|
||||||
.andExpect(status().isNotFound());
|
.andExpect(status().isNotFound())
|
||||||
|
.andExpect(statusReason(containsString(msgErrorNoFound("Tenant profile", savedTenantProfile.getId().getId().toString()))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -196,21 +245,26 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
List<TenantProfile> tenantProfiles = new ArrayList<>();
|
List<TenantProfile> tenantProfiles = new ArrayList<>();
|
||||||
PageLink pageLink = new PageLink(17);
|
PageLink pageLink = new PageLink(17);
|
||||||
PageData<TenantProfile> pageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
PageData<TenantProfile> pageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
||||||
new TypeReference<PageData<TenantProfile>>(){}, pageLink);
|
new TypeReference<>(){}, pageLink);
|
||||||
Assert.assertFalse(pageData.hasNext());
|
Assert.assertFalse(pageData.hasNext());
|
||||||
Assert.assertEquals(1, pageData.getTotalElements());
|
Assert.assertEquals(1, pageData.getTotalElements());
|
||||||
tenantProfiles.addAll(pageData.getData());
|
tenantProfiles.addAll(pageData.getData());
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
|
int cntEntity = 28;
|
||||||
for (int i=0;i<28;i++) {
|
for (int i=0;i<28;i++) {
|
||||||
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile"+i);
|
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile"+i);
|
||||||
tenantProfiles.add(doPost("/api/tenantProfile", tenantProfile, TenantProfile.class));
|
tenantProfiles.add(doPost("/api/tenantProfile", tenantProfile, TenantProfile.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(new TenantProfile(), ComponentLifecycleEvent.CREATED, cntEntity);
|
||||||
|
|
||||||
List<TenantProfile> loadedTenantProfiles = new ArrayList<>();
|
List<TenantProfile> loadedTenantProfiles = new ArrayList<>();
|
||||||
pageLink = new PageLink(17);
|
pageLink = new PageLink(17);
|
||||||
do {
|
do {
|
||||||
pageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
pageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
||||||
new TypeReference<PageData<TenantProfile>>(){}, pageLink);
|
new TypeReference<>(){}, pageLink);
|
||||||
loadedTenantProfiles.addAll(pageData.getData());
|
loadedTenantProfiles.addAll(pageData.getData());
|
||||||
if (pageData.hasNext()) {
|
if (pageData.hasNext()) {
|
||||||
pageLink = pageLink.nextPageLink();
|
pageLink = pageLink.nextPageLink();
|
||||||
@ -222,6 +276,8 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
|
|
||||||
Assert.assertEquals(tenantProfiles, loadedTenantProfiles);
|
Assert.assertEquals(tenantProfiles, loadedTenantProfiles);
|
||||||
|
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
|
||||||
for (TenantProfile tenantProfile : loadedTenantProfiles) {
|
for (TenantProfile tenantProfile : loadedTenantProfiles) {
|
||||||
if (!tenantProfile.isDefault()) {
|
if (!tenantProfile.isDefault()) {
|
||||||
doDelete("/api/tenantProfile/" + tenantProfile.getId().getId().toString())
|
doDelete("/api/tenantProfile/" + tenantProfile.getId().getId().toString())
|
||||||
@ -234,6 +290,8 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
new TypeReference<PageData<TenantProfile>>(){}, pageLink);
|
new TypeReference<PageData<TenantProfile>>(){}, pageLink);
|
||||||
Assert.assertFalse(pageData.hasNext());
|
Assert.assertFalse(pageData.hasNext());
|
||||||
Assert.assertEquals(1, pageData.getTotalElements());
|
Assert.assertEquals(1, pageData.getTotalElements());
|
||||||
|
|
||||||
|
testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(new TenantProfile(), ComponentLifecycleEvent.DELETED, cntEntity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -242,7 +300,7 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
List<TenantProfile> tenantProfiles = new ArrayList<>();
|
List<TenantProfile> tenantProfiles = new ArrayList<>();
|
||||||
PageLink pageLink = new PageLink(17);
|
PageLink pageLink = new PageLink(17);
|
||||||
PageData<TenantProfile> tenantProfilePageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
PageData<TenantProfile> tenantProfilePageData = doGetTypedWithPageLink("/api/tenantProfiles?",
|
||||||
new TypeReference<PageData<TenantProfile>>(){}, pageLink);
|
new TypeReference<>(){}, pageLink);
|
||||||
Assert.assertFalse(tenantProfilePageData.hasNext());
|
Assert.assertFalse(tenantProfilePageData.hasNext());
|
||||||
Assert.assertEquals(1, tenantProfilePageData.getTotalElements());
|
Assert.assertEquals(1, tenantProfilePageData.getTotalElements());
|
||||||
tenantProfiles.addAll(tenantProfilePageData.getData());
|
tenantProfiles.addAll(tenantProfilePageData.getData());
|
||||||
@ -322,4 +380,28 @@ public abstract class BaseTenantProfileControllerTest extends AbstractController
|
|||||||
profileData.setQueueConfiguration(Collections.singletonList(mainQueueConfiguration));
|
profileData.setQueueConfiguration(Collections.singletonList(mainQueueConfiguration));
|
||||||
tenantProfile.setProfileData(profileData);
|
tenantProfile.setProfileData(profileData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void testBroadcastEntityStateChangeEventTimeManyTimeTenantProfile(TenantProfile tenantProfile, ComponentLifecycleEvent event, int cntTime) {
|
||||||
|
ArgumentMatcher<TenantProfile> matcherTenantProfile = cntTime == 1 ? argument -> argument.equals(tenantProfile) :
|
||||||
|
argument -> argument.getClass().equals(TenantProfile.class);
|
||||||
|
if (ComponentLifecycleEvent.DELETED.equals(event)) {
|
||||||
|
Mockito.verify(tbClusterService, times( cntTime)).onTenantProfileDelete(Mockito.argThat( matcherTenantProfile),
|
||||||
|
Mockito.isNull());
|
||||||
|
testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant()));
|
||||||
|
} else {
|
||||||
|
Mockito.verify(tbClusterService, times( cntTime)).onTenantProfileChange(Mockito.argThat(matcherTenantProfile),
|
||||||
|
Mockito.isNull());
|
||||||
|
TenantProfileId tenantProfileIdId = cntTime == 1 ? tenantProfile.getId() : (TenantProfileId) createEntityId_NULL_UUID(tenantProfile);
|
||||||
|
testBroadcastEntityStateChangeEventTime(tenantProfileIdId, null, cntTime);
|
||||||
|
}
|
||||||
|
Mockito.reset(tbClusterService);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testBroadcastEntityStateChangeEventNeverTenantProfile() {
|
||||||
|
Mockito.verify(tbClusterService, never()).onTenantProfileChange(Mockito.any(TenantProfile.class),
|
||||||
|
Mockito.isNull());
|
||||||
|
testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant()));
|
||||||
|
Mockito.reset(tbClusterService, auditLogService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
69
docker/docker-compose.rabbitmq-server.yml
Normal file
69
docker/docker-compose.rabbitmq-server.yml
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
#
|
||||||
|
# Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
version: '2.2'
|
||||||
|
|
||||||
|
services:
|
||||||
|
rabbitmq:
|
||||||
|
restart: always
|
||||||
|
image: rabbitmq:3
|
||||||
|
ports:
|
||||||
|
- '5672:5672'
|
||||||
|
environment:
|
||||||
|
RABBITMQ_DEFAULT_USER: YOUR_USERNAME
|
||||||
|
RABBITMQ_DEFAULT_PASS: YOUR_PASSWORD
|
||||||
|
tb-js-executor:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-core1:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-core2:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-rule-engine1:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-rule-engine2:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-mqtt-transport1:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-mqtt-transport2:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-http-transport1:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-http-transport2:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-coap-transport:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-lwm2m-transport:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-snmp-transport:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-vc-executor1:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
|
tb-vc-executor2:
|
||||||
|
depends_on:
|
||||||
|
- rabbitmq
|
||||||
@ -22,14 +22,20 @@ import org.junit.extensions.cpsuite.ClasspathSuite;
|
|||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.testcontainers.containers.DockerComposeContainer;
|
import org.testcontainers.containers.DockerComposeContainer;
|
||||||
import org.testcontainers.containers.wait.strategy.Wait;
|
import org.testcontainers.containers.wait.strategy.Wait;
|
||||||
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
@ -44,11 +50,13 @@ import static org.junit.Assert.fail;
|
|||||||
public class ContainerTestSuite {
|
public class ContainerTestSuite {
|
||||||
final static boolean IS_REDIS_CLUSTER = Boolean.parseBoolean(System.getProperty("blackBoxTests.redisCluster"));
|
final static boolean IS_REDIS_CLUSTER = Boolean.parseBoolean(System.getProperty("blackBoxTests.redisCluster"));
|
||||||
final static boolean IS_HYBRID_MODE = Boolean.parseBoolean(System.getProperty("blackBoxTests.hybridMode"));
|
final static boolean IS_HYBRID_MODE = Boolean.parseBoolean(System.getProperty("blackBoxTests.hybridMode"));
|
||||||
|
final static String QUEUE_TYPE = System.getProperty("blackBoxTests.queue", "kafka");
|
||||||
private static final String SOURCE_DIR = "./../../docker/";
|
private static final String SOURCE_DIR = "./../../docker/";
|
||||||
private static final String TB_CORE_LOG_REGEXP = ".*Starting polling for events.*";
|
private static final String TB_CORE_LOG_REGEXP = ".*Starting polling for events.*";
|
||||||
private static final String TRANSPORTS_LOG_REGEXP = ".*Going to recalculate partitions.*";
|
private static final String TRANSPORTS_LOG_REGEXP = ".*Going to recalculate partitions.*";
|
||||||
private static final String TB_VC_LOG_REGEXP = TRANSPORTS_LOG_REGEXP;
|
private static final String TB_VC_LOG_REGEXP = TRANSPORTS_LOG_REGEXP;
|
||||||
private static final String TB_JS_EXECUTOR_KAFKA_LOG_REGEXP = ".*Consumer has joined the group.*";
|
private static final String TB_JS_EXECUTOR_KAFKA_LOG_REGEXP = ".*Consumer has joined the group.*";
|
||||||
|
private static final Duration CONTAINER_STARTUP_TIMEOUT = Duration.ofSeconds(400);
|
||||||
|
|
||||||
private static DockerComposeContainer<?> testContainer;
|
private static DockerComposeContainer<?> testContainer;
|
||||||
|
|
||||||
@ -82,17 +90,33 @@ public class ContainerTestSuite {
|
|||||||
List<File> composeFiles = new ArrayList<>(Arrays.asList(
|
List<File> composeFiles = new ArrayList<>(Arrays.asList(
|
||||||
new File(targetDir + "docker-compose.yml"),
|
new File(targetDir + "docker-compose.yml"),
|
||||||
new File(targetDir + "docker-compose.volumes.yml"),
|
new File(targetDir + "docker-compose.volumes.yml"),
|
||||||
IS_HYBRID_MODE
|
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.hybrid.yml" : "docker-compose.postgres.yml")),
|
||||||
? new File(targetDir + "docker-compose.hybrid.yml")
|
|
||||||
: new File(targetDir + "docker-compose.postgres.yml"),
|
|
||||||
new File(targetDir + "docker-compose.postgres.volumes.yml"),
|
new File(targetDir + "docker-compose.postgres.volumes.yml"),
|
||||||
new File(targetDir + "docker-compose.kafka.yml"),
|
new File(targetDir + "docker-compose." + QUEUE_TYPE + ".yml"),
|
||||||
IS_REDIS_CLUSTER
|
new File(targetDir + (IS_REDIS_CLUSTER ? "docker-compose.redis-cluster.yml" : "docker-compose.redis.yml")),
|
||||||
? new File(targetDir + "docker-compose.redis-cluster.yml")
|
new File(targetDir + (IS_HYBRID_MODE ? "docker-compose.redis-cluster.volumes.yml" : "docker-compose.redis.volumes.yml"))
|
||||||
: new File(targetDir + "docker-compose.redis.yml"),
|
));
|
||||||
IS_REDIS_CLUSTER
|
|
||||||
? new File(targetDir + "docker-compose.redis-cluster.volumes.yml")
|
Map<String, String> queueEnv = new HashMap<>();
|
||||||
: new File(targetDir + "docker-compose.redis.volumes.yml")));
|
queueEnv.put("TB_QUEUE_TYPE", QUEUE_TYPE);
|
||||||
|
switch (QUEUE_TYPE) {
|
||||||
|
case "kafka":
|
||||||
|
composeFiles.add(new File(targetDir + "docker-compose.kafka.yml"));
|
||||||
|
break;
|
||||||
|
case "aws-sqs":
|
||||||
|
replaceInFile(targetDir, "queue-aws-sqs.env",
|
||||||
|
Map.of("YOUR_KEY", getSysProp("blackBoxTests.awsKey"),
|
||||||
|
"YOUR_SECRET", "blackBoxTests.awsSecret",
|
||||||
|
"YOUR_REGION", "blackBoxTests.awsRegion"));
|
||||||
|
break;
|
||||||
|
case "rabbitmq":
|
||||||
|
composeFiles.add(new File(targetDir + "docker-compose.rabbitmq-server.yml"));
|
||||||
|
replaceInFile(targetDir, "queue-rabbitmq.env",
|
||||||
|
Map.of("localhost", "rabbitmq"));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new RuntimeException("Unsupported queue type: " + QUEUE_TYPE);
|
||||||
|
}
|
||||||
|
|
||||||
if (IS_HYBRID_MODE) {
|
if (IS_HYBRID_MODE) {
|
||||||
composeFiles.add(new File(targetDir + "docker-compose.cassandra.volumes.yml"));
|
composeFiles.add(new File(targetDir + "docker-compose.cassandra.volumes.yml"));
|
||||||
@ -103,17 +127,18 @@ public class ContainerTestSuite {
|
|||||||
.withLocalCompose(true)
|
.withLocalCompose(true)
|
||||||
.withTailChildContainers(!skipTailChildContainers)
|
.withTailChildContainers(!skipTailChildContainers)
|
||||||
.withEnv(installTb.getEnv())
|
.withEnv(installTb.getEnv())
|
||||||
|
.withEnv(queueEnv)
|
||||||
.withEnv("LOAD_BALANCER_NAME", "")
|
.withEnv("LOAD_BALANCER_NAME", "")
|
||||||
.withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(Duration.ofSeconds(400)))
|
.withExposedService("haproxy", 80, Wait.forHttp("/swagger-ui.html").withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-core1", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-core2", Wait.forLogMessage(TB_CORE_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-http-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-http-transport2", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-http-transport2", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-mqtt-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-mqtt-transport1", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-mqtt-transport2", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-mqtt-transport2", Wait.forLogMessage(TRANSPORTS_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-vc-executor1", Wait.forLogMessage(TB_VC_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-vc-executor1", Wait.forLogMessage(TB_VC_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT))
|
||||||
.waitingFor("tb-vc-executor2", Wait.forLogMessage(TB_VC_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)))
|
.waitingFor("tb-vc-executor2", Wait.forLogMessage(TB_VC_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT));
|
||||||
.waitingFor("tb-js-executor", Wait.forLogMessage(TB_JS_EXECUTOR_KAFKA_LOG_REGEXP, 1).withStartupTimeout(Duration.ofSeconds(400)));
|
.waitingFor("tb-js-executor", Wait.forLogMessage(TB_JS_EXECUTOR_KAFKA_LOG_REGEXP, 1).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to create test container", e);
|
log.error("Failed to create test container", e);
|
||||||
fail("Failed to create test container");
|
fail("Failed to create test container");
|
||||||
@ -122,6 +147,23 @@ public class ContainerTestSuite {
|
|||||||
return testContainer;
|
return testContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void replaceInFile(String targetDir, String fileName, Map<String, String> replacements) throws IOException {
|
||||||
|
Path envFilePath = Path.of(targetDir, fileName);
|
||||||
|
String data = Files.readString(envFilePath);
|
||||||
|
for (var entry : replacements.entrySet()) {
|
||||||
|
data = data.replace(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
Files.write(envFilePath, data.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getSysProp(String propertyName) {
|
||||||
|
var value = System.getProperty(propertyName);
|
||||||
|
if (StringUtils.isEmpty(value)) {
|
||||||
|
throw new RuntimeException("Please define system property: " + propertyName + "!");
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
private static void tryDeleteDir(String targetDir) {
|
private static void tryDeleteDir(String targetDir) {
|
||||||
try {
|
try {
|
||||||
log.info("Trying to delete temp dir {}", targetDir);
|
log.info("Trying to delete temp dir {}", targetDir);
|
||||||
@ -137,7 +179,7 @@ public class ContainerTestSuite {
|
|||||||
* docker-compose files which contain container_name are not supported and the creation of DockerComposeContainer fails due to IllegalStateException.
|
* docker-compose files which contain container_name are not supported and the creation of DockerComposeContainer fails due to IllegalStateException.
|
||||||
* This has been introduced in #1151 as a quick fix for unintuitive feedback. https://github.com/testcontainers/testcontainers-java/issues/1151
|
* This has been introduced in #1151 as a quick fix for unintuitive feedback. https://github.com/testcontainers/testcontainers-java/issues/1151
|
||||||
* Using the latest testcontainers and waiting for the fix...
|
* Using the latest testcontainers and waiting for the fix...
|
||||||
* */
|
*/
|
||||||
private static void replaceInFile(String sourceFilename, String target, String replacement, String verifyPhrase) {
|
private static void replaceInFile(String sourceFilename, String target, String replacement, String verifyPhrase) {
|
||||||
try {
|
try {
|
||||||
File file = new File(sourceFilename);
|
File file = new File(sourceFilename);
|
||||||
|
|||||||
@ -20,7 +20,6 @@ import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
|
|||||||
import { IQueue } from './queue.models';
|
import { IQueue } from './queue.models';
|
||||||
import amqp, { ConfirmChannel, Connection } from 'amqplib';
|
import amqp, { ConfirmChannel, Connection } from 'amqplib';
|
||||||
import { Options, Replies } from 'amqplib/properties';
|
import { Options, Replies } from 'amqplib/properties';
|
||||||
import { sleep } from '../api/utils';
|
|
||||||
|
|
||||||
export class RabbitMqTemplate implements IQueue {
|
export class RabbitMqTemplate implements IQueue {
|
||||||
|
|
||||||
@ -32,7 +31,6 @@ export class RabbitMqTemplate implements IQueue {
|
|||||||
private username = config.get('rabbitmq.username');
|
private username = config.get('rabbitmq.username');
|
||||||
private password = config.get('rabbitmq.password');
|
private password = config.get('rabbitmq.password');
|
||||||
private queueProperties: string = config.get('rabbitmq.queue_properties');
|
private queueProperties: string = config.get('rabbitmq.queue_properties');
|
||||||
private pollInterval = Number(config.get('js.response_poll_interval'));
|
|
||||||
|
|
||||||
private queueOptions: Options.AssertQueue = {
|
private queueOptions: Options.AssertQueue = {
|
||||||
durable: false,
|
durable: false,
|
||||||
@ -41,7 +39,6 @@ export class RabbitMqTemplate implements IQueue {
|
|||||||
};
|
};
|
||||||
private connection: Connection;
|
private connection: Connection;
|
||||||
private channel: ConfirmChannel;
|
private channel: ConfirmChannel;
|
||||||
private stopped = false;
|
|
||||||
private topics: string[] = [];
|
private topics: string[] = [];
|
||||||
|
|
||||||
name = 'RabbitMQ';
|
name = 'RabbitMQ';
|
||||||
@ -60,20 +57,12 @@ export class RabbitMqTemplate implements IQueue {
|
|||||||
|
|
||||||
const messageProcessor = new JsInvokeMessageProcessor(this);
|
const messageProcessor = new JsInvokeMessageProcessor(this);
|
||||||
|
|
||||||
while (!this.stopped) {
|
await this.channel.consume(this.requestTopic, (message) => {
|
||||||
let pollStartTs = new Date().getTime();
|
|
||||||
let message = await this.channel.get(this.requestTopic);
|
|
||||||
|
|
||||||
if (message) {
|
if (message) {
|
||||||
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
|
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
|
||||||
this.channel.ack(message);
|
this.channel.ack(message);
|
||||||
} else {
|
|
||||||
let pollDuration = new Date().getTime() - pollStartTs;
|
|
||||||
if (pollDuration < this.pollInterval) {
|
|
||||||
await sleep(this.pollInterval - pollDuration);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user