diff --git a/application/src/main/data/json/edge/instructions/upgrade/docker/upgrade_db.md b/application/src/main/data/json/edge/instructions/upgrade/docker/upgrade_db.md index a594ebe4e7..386c5f2743 100644 --- a/application/src/main/data/json/edge/instructions/upgrade/docker/upgrade_db.md +++ b/application/src/main/data/json/edge/instructions/upgrade/docker/upgrade_db.md @@ -43,19 +43,6 @@ volumes: Execute the following command to start upgrade process: ```bash -docker compose -f docker-compose-upgrade.yml up +docker compose -f docker-compose-upgrade.yml up --abort-on-container-exit {:copy-code} -``` - -Once upgrade process successfully completed, exit from the docker-compose shell by this combination: - -```text -Ctrl + C -``` - -Execute the following command to stop TB Edge upgrade container: - -```bash -docker compose -f docker-compose-upgrade.yml stop -{:copy-code} -``` +``` \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 77bf7c32fa..a29fcfc362 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -126,8 +126,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso final TenantId tenantId; final DeviceId deviceId; final LinkedHashMapRemoveEldest sessions; - private final Map attributeSubscriptions; - private final Map rpcSubscriptions; + final Map attributeSubscriptions; + final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; private final boolean rpcSequential; private final RpcSubmitStrategy rpcSubmitStrategy; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 7c958343ca..9f7ace9df8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -404,6 +404,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void scheduleEdgeEventsCheck(EdgeGrpcSession session) { EdgeId edgeId = session.getEdge().getId(); TenantId tenantId = session.getEdge().getTenantId(); + + cancelScheduleEdgeEventsCheck(edgeId); + if (sessions.containsKey(edgeId)) { ScheduledFuture edgeEventCheckTask = edgeEventProcessingExecutorService.schedule(() -> { try { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 137c4d5bf5..2e4d5c8aa5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -448,7 +448,11 @@ public abstract class EdgeGrpcSession implements Closeable { private void scheduleDownlinkMsgsPackSend(int attempt) { Runnable sendDownlinkMsgsTask = () -> { try { - if (isConnected() && !sessionState.getPendingMsgsMap().values().isEmpty()) { + if (!isConnected()) { + stopCurrentSendDownlinkMsgsTask(true); + return; + } + if (!sessionState.getPendingMsgsMap().values().isEmpty()) { List copy = new ArrayList<>(sessionState.getPendingMsgsMap().values()); if (attempt > 1) { String error = "Failed to deliver the batch"; @@ -529,6 +533,11 @@ public abstract class EdgeGrpcSession implements Closeable { log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg); } else { log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg()); + DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId()); + // if NOT timeseries or attributes failures - ack failed downlink + if (downlinkMsg.getEntityDataCount() == 0) { + sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId()); + } } if (sessionState.getPendingMsgsMap().isEmpty()) { log.debug("[{}][{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getId(), sessionId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index 7426207a56..7fab0094b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -87,6 +87,7 @@ public class EdgeSyncCursor { fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new OtaPackagesEdgeEventFetcher(ctx.getOtaPackageService())); + fetchers.add(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService())); fetchers.add(new TenantResourcesEdgeEventFetcher(ctx.getResourceService())); fetchers.add(new OAuth2EdgeEventFetcher(ctx.getDomainService())); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index b2a1044981..93461f8c39 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -54,23 +54,25 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { log.trace("[{}] Finding general edge events [{}], seqIdStart = {}, pageLink = {}", tenantId, edge.getId(), seqIdStart, pageLink); PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, null, (TimePageLink) pageLink); - if (edgeEvents.getData().isEmpty()) { + if (!edgeEvents.getData().isEmpty()) { + return edgeEvents; + } + if (seqIdStart > this.maxReadRecordsCount) { edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount), (TimePageLink) pageLink); if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) { log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId()); this.seqIdNewCycleStarted = true; this.seqIdStart = 0L; - } else { - edgeEvents = new PageData<>(); - log.warn("[{}] unexpected edge notification message received. " + - "no new events found and seqId column of edge_event table doesn't started new cycle [{}]", tenantId, edge.getId()); + return edgeEvents; } } - return edgeEvents; + log.info("[{}] Unexpected edge notification message received. " + + "No new events found, and the seqId column of the edge_event table has not started a new cycle [{}].", tenantId, edge.getId()); + return new PageData<>(); } catch (Exception e) { - log.error("[{}] failed to find edge events [{}]", tenantId, edge.getId()); + log.error("[{}] Failed to find edge events [{}]", tenantId, edge.getId(), e); + return new PageData<>(); } - return new PageData<>(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index eaf3c1a95a..50229f9440 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -120,7 +120,7 @@ public abstract class BaseEdgeProcessor { private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) { return switch (action) { case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, ADDED_COMMENT, - UPDATED_COMMENT -> true; + UPDATED_COMMENT, DELETED -> true; default -> switch (type) { case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, NOTIFICATION_TEMPLATE, NOTIFICATION_TARGET, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java index faf001fbba..39007552ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java @@ -41,7 +41,10 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor { if (dashboard == null) { throw new RuntimeException("[{" + tenantId + "}] dashboardUpdateMsg {" + dashboardUpdateMsg + "} cannot be converted to dashboard"); } - Set newAssignedCustomers = new HashSet<>(dashboard.getAssignedCustomers()); + Set newAssignedCustomers = new HashSet<>(); + if (dashboard.getAssignedCustomers() != null && !dashboard.getAssignedCustomers().isEmpty()) { + newAssignedCustomers.addAll(dashboard.getAssignedCustomers()); + } Dashboard dashboardById = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); if (dashboardById == null) { created = true; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java index 6d47c16680..e04d4d01af 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java @@ -28,17 +28,22 @@ import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg; public interface EdgeRequestsService { + @Deprecated(since = "3.9.1", forRemoval = true) ListenableFuture processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg); ListenableFuture processAttributesRequestMsg(TenantId tenantId, Edge edge, AttributesRequestMsg attributesRequestMsg); ListenableFuture processRelationRequestMsg(TenantId tenantId, Edge edge, RelationRequestMsg relationRequestMsg); + @Deprecated(since = "3.9.1", forRemoval = true) ListenableFuture processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg); + @Deprecated(since = "3.9.1", forRemoval = true) ListenableFuture processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg); + @Deprecated(since = "3.9.1", forRemoval = true) ListenableFuture processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg); + @Deprecated(since = "3.9.1", forRemoval = true) ListenableFuture processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 368367bbab..27478c8a01 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -212,9 +212,9 @@ ui: database: ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by a single API call to fetch telemetry records ts: - type: "${DATABASE_TS_TYPE:sql}" # cassandra or sql. timescale option is deprecated and will no longer be supported in ThingsBoard 4.0 + type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale) ts_latest: - type: "${DATABASE_TS_LATEST_TYPE:sql}" # cassandra or sql. timescale option is deprecated and will no longer be supported in ThingsBoard 4.0 + type: "${DATABASE_TS_LATEST_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale) # Cassandra driver configuration parameters cassandra: diff --git a/application/src/test/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessorTest.java b/application/src/test/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessorTest.java index 35f53587e5..4f571d4fe6 100644 --- a/application/src/test/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessorTest.java @@ -17,22 +17,27 @@ package org.thingsboard.server.actors.device; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.service.transport.TbCoreToTransportService; + +import java.util.UUID; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; public class DeviceActorMessageProcessorTest { - public static final long MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10L; + public static final int MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10; ActorSystemContext systemContext; DeviceService deviceService; TenantId tenantId = TenantId.SYS_TENANT_ID; @@ -44,15 +49,38 @@ public class DeviceActorMessageProcessorTest { public void setUp() { systemContext = mock(ActorSystemContext.class); deviceService = mock(DeviceService.class); - willReturn(MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice(); + willReturn((long)MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice(); willReturn(deviceService).given(systemContext).getDeviceService(); processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId); + willReturn(mock(TbCoreToTransportService.class)).given(systemContext).getTbCoreToTransportService(); } @Test public void givenSystemContext_whenNewInstance_thenVerifySessionMapMaxSize() { assertThat(processor.sessions, instanceOf(LinkedHashMapRemoveEldest.class)); - assertThat(processor.sessions.getMaxEntries(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE)); + assertThat(processor.sessions.getMaxEntries(), is((long)MAX_CONCURRENT_SESSIONS_PER_DEVICE)); assertThat(processor.sessions.getRemovalConsumer(), notNullValue()); } + + @Test + public void givenFullSessionMap_whenSessionOverflow_thenShouldDeleteAttributeAndRPCSubscriptions() { + //givenFullSessionMap + for (int i = 0; i < MAX_CONCURRENT_SESSIONS_PER_DEVICE; i++) { + UUID sessionID = UUID.randomUUID(); + processor.sessions.put(sessionID, Mockito.mock(SessionInfoMetaData.class, RETURNS_DEEP_STUBS)); + processor.attributeSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class)); + processor.rpcSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class)); + } + assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE)); + assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE)); + assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE)); + + //add one more + processor.sessions.put(UUID.randomUUID(), Mockito.mock(SessionInfoMetaData.class)); + + assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE)); + assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1)); + assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1)); + + } } \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java index 4d38934dcb..0ffdbe8e60 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdgeControllerTest.java @@ -897,7 +897,7 @@ public class EdgeControllerTest extends AbstractControllerTest { edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class); edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class); - edgeImitator.expectMessageAmount(26); + edgeImitator.expectMessageAmount(27); edgeImitator.connect(); waitForMessages(edgeImitator); @@ -1003,6 +1003,7 @@ public class EdgeControllerTest extends AbstractControllerTest { Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "connectivity")); Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "jwt")); Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); + Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default")); diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 10db03a6c2..afc0f72081 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -144,7 +144,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret()); edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class); edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class); - edgeImitator.expectMessageAmount(24); + edgeImitator.expectMessageAmount(26); edgeImitator.connect(); requestEdgeRuleChainMetadata(); @@ -265,10 +265,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { // 4 messages // - 1 from default profile fetcher - // - 2 from device profile fetcher (default and thermostat) + // - 4 from device profile fetcher (2 * (default and thermostat) before and after ota packages fetcher // - 1 from device fetcher - validateMsgsCnt(DeviceProfileUpdateMsg.class, 4); - validateDeviceProfiles(4); + validateMsgsCnt(DeviceProfileUpdateMsg.class, 6); + validateDeviceProfiles(6); // 3 messages // - 1 from default profile fetcher @@ -656,7 +656,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { } protected RuleChainId createEdgeRuleChainAndAssignToEdge(String ruleChainName) throws Exception { - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); RuleChain ruleChain = new RuleChain(); ruleChain.setName(ruleChainName); ruleChain.setType(RuleChainType.EDGE); diff --git a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java index 5b09e05a09..42b75b5a74 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java @@ -48,12 +48,14 @@ public class DashboardEdgeTest extends AbstractEdgeTest { private static final int MOBILE_ORDER = 5; private static final String IMAGE = ""; + private static final String DASHBOARD_TITLE = "Edge Test Dashboard"; + @Test public void testDashboards() throws Exception { // create dashboard and assign to edge edgeImitator.expectMessageAmount(2); Dashboard dashboard = new Dashboard(); - dashboard.setTitle("Edge Test Dashboard"); + dashboard.setTitle(DASHBOARD_TITLE); dashboard.setMobileHide(true); dashboard.setImage(IMAGE); dashboard.setMobileOrder(MOBILE_ORDER); @@ -200,12 +202,27 @@ public class DashboardEdgeTest extends AbstractEdgeTest { Dashboard foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class); Assert.assertNotNull(foundDashboard); - Assert.assertEquals("Edge Test Dashboard", foundDashboard.getName()); + Assert.assertEquals(DASHBOARD_TITLE, foundDashboard.getName()); PageData pageData = doGetTypedWithPageLink("/api/customer/" + savedCustomer.getId().toString() + "/dashboards?", new TypeReference<>() {}, new PageLink(100)); Assert.assertEquals(1, pageData.getData().size()); - Assert.assertEquals("Edge Test Dashboard", pageData.getData().get(0).getTitle()); + Assert.assertEquals(DASHBOARD_TITLE, pageData.getData().get(0).getTitle()); + + dashboard.setTitle(DASHBOARD_TITLE + " Updated"); + dashboard.setAssignedCustomers(null); + dashboardUpdateMsgBuilder.setEntity(JacksonUtil.toString(dashboard)); + dashboardUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE); + uplinkMsgBuilder = UplinkMsg.newBuilder(); + uplinkMsgBuilder.addDashboardUpdateMsg(dashboardUpdateMsgBuilder.build()); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + + Assert.assertTrue(edgeImitator.waitForResponses()); + + foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class); + Assert.assertEquals(DASHBOARD_TITLE + " Updated", foundDashboard.getName()); } @Test @@ -256,7 +273,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest { Dashboard dashboard = new Dashboard(); dashboard.setId(new DashboardId(UUID.randomUUID())); dashboard.setTenantId(tenantId); - dashboard.setTitle("Edge Test Dashboard"); + dashboard.setTitle(DASHBOARD_TITLE); dashboard.setAssignedCustomers(Sets.newHashSet(new ShortCustomerInfo(savedCustomer.getId(), savedCustomer.getTitle(), savedCustomer.isPublic()))); return dashboard; } diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceProfileEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceProfileEdgeTest.java index 1202c764d7..c0b4fbdd61 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceProfileEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceProfileEdgeTest.java @@ -126,6 +126,45 @@ public class DeviceProfileEdgeTest extends AbstractEdgeTest { unAssignFromEdgeAndDeleteDashboard(thermostatsDashboardId); } + @Test + public void testDeleteDeviceProfilesWhenEdgeIsOffline() throws Exception { + RuleChainId thermostatsRuleChainId = createEdgeRuleChainAndAssignToEdge("Thermostats Rule Chain"); + + // create device profile + DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null); + deviceProfile.setDefaultEdgeRuleChainId(thermostatsRuleChainId); + extendDeviceProfileData(deviceProfile); + edgeImitator.expectMessageAmount(1); + deviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceProfileUpdateMsg); + DeviceProfileUpdateMsg deviceProfileUpdateMsg = (DeviceProfileUpdateMsg) latestMessage; + DeviceProfile deviceProfileMsg = JacksonUtil.fromString(deviceProfileUpdateMsg.getEntity(), DeviceProfile.class, true); + Assert.assertNotNull(deviceProfileMsg); + Assert.assertEquals(deviceProfile, deviceProfileMsg); + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType()); + + // delete profile when edge is offline + edgeImitator.disconnect(); + doDelete("/api/deviceProfile/" + deviceProfile.getUuidId()) + .andExpect(status().isOk()); + edgeImitator.connect(); + // 27 sync message + // + 1 delete message + edgeImitator.expectMessageAmount(28); + Assert.assertTrue(edgeImitator.waitForMessages()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceProfileUpdateMsg); + deviceProfileUpdateMsg = (DeviceProfileUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType()); + Assert.assertEquals(deviceProfile.getUuidId().getMostSignificantBits(), deviceProfileUpdateMsg.getIdMSB()); + Assert.assertEquals(deviceProfile.getUuidId().getLeastSignificantBits(), deviceProfileUpdateMsg.getIdLSB()); + + unAssignFromEdgeAndDeleteRuleChain(thermostatsRuleChainId); + } + @Test public void testDeviceProfiles_snmp() throws Exception { DeviceProfile deviceProfile = createDeviceProfileAndDoBasicAssert("SNMP", createSnmpDeviceProfileTransportConfiguration()); @@ -449,4 +488,5 @@ public class DeviceProfileEdgeTest extends AbstractEdgeTest { deviceProfile.setProfileData(createProfileData()); return deviceProfile; } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index dd5d61e834..0767e5e303 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -96,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); + private final AtomicLong edgeConsumerCount = new AtomicLong(); public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, @@ -472,7 +473,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); - consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId()); + consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java index cc0e044917..06fd6147a8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbCoreQueueFactory.java @@ -95,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { private final TbQueueAdmin edgeEventAdmin; private final AtomicLong consumerCount = new AtomicLong(); + private final AtomicLong edgeConsumerCount = new AtomicLong(); public KafkaTbCoreQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, @@ -421,7 +422,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId)); - consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId()); + consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet()); consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer")); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(edgeEventAdmin); diff --git a/ui-ngx/src/app/modules/home/components/widget/config/basic/indicator/liquid-level-card-basic-config.component.html b/ui-ngx/src/app/modules/home/components/widget/config/basic/indicator/liquid-level-card-basic-config.component.html index 9808548bbe..a0776f5aa8 100644 --- a/ui-ngx/src/app/modules/home/components/widget/config/basic/indicator/liquid-level-card-basic-config.component.html +++ b/ui-ngx/src/app/modules/home/components/widget/config/basic/indicator/liquid-level-card-basic-config.component.html @@ -63,7 +63,7 @@ -
+
widgets.liquid-level-card.shape
diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/settings/indicator/liquid-level-card-widget-settings.component.html b/ui-ngx/src/app/modules/home/components/widget/lib/settings/indicator/liquid-level-card-widget-settings.component.html index a1aae64747..8d787116a8 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/settings/indicator/liquid-level-card-widget-settings.component.html +++ b/ui-ngx/src/app/modules/home/components/widget/lib/settings/indicator/liquid-level-card-widget-settings.component.html @@ -18,7 +18,7 @@
-
+
widgets.liquid-level-card.shape