Merge branch 'rc' of github.com:thingsboard/thingsboard
This commit is contained in:
		
						commit
						6dd08276f8
					
				@ -43,19 +43,6 @@ volumes:
 | 
			
		||||
Execute the following command to start upgrade process:
 | 
			
		||||
 | 
			
		||||
```bash
 | 
			
		||||
docker compose -f docker-compose-upgrade.yml up
 | 
			
		||||
{:copy-code}
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Once upgrade process successfully completed, exit from the docker-compose shell by this combination:
 | 
			
		||||
 | 
			
		||||
```text
 | 
			
		||||
Ctrl + C
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Execute the following command to stop TB Edge upgrade container:
 | 
			
		||||
 | 
			
		||||
```bash
 | 
			
		||||
docker compose -f docker-compose-upgrade.yml stop
 | 
			
		||||
docker compose -f docker-compose-upgrade.yml up --abort-on-container-exit
 | 
			
		||||
{:copy-code}
 | 
			
		||||
```
 | 
			
		||||
@ -126,8 +126,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
 | 
			
		||||
    final TenantId tenantId;
 | 
			
		||||
    final DeviceId deviceId;
 | 
			
		||||
    final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
 | 
			
		||||
    private final Map<UUID, SessionInfo> attributeSubscriptions;
 | 
			
		||||
    private final Map<UUID, SessionInfo> rpcSubscriptions;
 | 
			
		||||
    final Map<UUID, SessionInfo> attributeSubscriptions;
 | 
			
		||||
    final Map<UUID, SessionInfo> rpcSubscriptions;
 | 
			
		||||
    private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
 | 
			
		||||
    private final boolean rpcSequential;
 | 
			
		||||
    private final RpcSubmitStrategy rpcSubmitStrategy;
 | 
			
		||||
 | 
			
		||||
@ -404,6 +404,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
 | 
			
		||||
    private void scheduleEdgeEventsCheck(EdgeGrpcSession session) {
 | 
			
		||||
        EdgeId edgeId = session.getEdge().getId();
 | 
			
		||||
        TenantId tenantId = session.getEdge().getTenantId();
 | 
			
		||||
 | 
			
		||||
        cancelScheduleEdgeEventsCheck(edgeId);
 | 
			
		||||
 | 
			
		||||
        if (sessions.containsKey(edgeId)) {
 | 
			
		||||
            ScheduledFuture<?> edgeEventCheckTask = edgeEventProcessingExecutorService.schedule(() -> {
 | 
			
		||||
                try {
 | 
			
		||||
 | 
			
		||||
@ -448,7 +448,11 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
    private void scheduleDownlinkMsgsPackSend(int attempt) {
 | 
			
		||||
        Runnable sendDownlinkMsgsTask = () -> {
 | 
			
		||||
            try {
 | 
			
		||||
                if (isConnected() && !sessionState.getPendingMsgsMap().values().isEmpty()) {
 | 
			
		||||
                if (!isConnected()) {
 | 
			
		||||
                    stopCurrentSendDownlinkMsgsTask(true);
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
                if (!sessionState.getPendingMsgsMap().values().isEmpty()) {
 | 
			
		||||
                    List<DownlinkMsg> copy = new ArrayList<>(sessionState.getPendingMsgsMap().values());
 | 
			
		||||
                    if (attempt > 1) {
 | 
			
		||||
                        String error = "Failed to deliver the batch";
 | 
			
		||||
@ -529,6 +533,11 @@ public abstract class EdgeGrpcSession implements Closeable {
 | 
			
		||||
                log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
 | 
			
		||||
            } else {
 | 
			
		||||
                log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
 | 
			
		||||
                DownlinkMsg downlinkMsg = sessionState.getPendingMsgsMap().get(msg.getDownlinkMsgId());
 | 
			
		||||
                // if NOT timeseries or attributes failures - ack failed downlink
 | 
			
		||||
                if (downlinkMsg.getEntityDataCount() == 0) {
 | 
			
		||||
                    sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            if (sessionState.getPendingMsgsMap().isEmpty()) {
 | 
			
		||||
                log.debug("[{}][{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getId(), sessionId);
 | 
			
		||||
 | 
			
		||||
@ -87,6 +87,7 @@ public class EdgeSyncCursor {
 | 
			
		||||
            fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
 | 
			
		||||
            fetchers.add(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
 | 
			
		||||
            fetchers.add(new OtaPackagesEdgeEventFetcher(ctx.getOtaPackageService()));
 | 
			
		||||
            fetchers.add(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService()));
 | 
			
		||||
            fetchers.add(new TenantResourcesEdgeEventFetcher(ctx.getResourceService()));
 | 
			
		||||
            fetchers.add(new OAuth2EdgeEventFetcher(ctx.getDomainService()));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -54,23 +54,25 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
 | 
			
		||||
            log.trace("[{}] Finding general edge events [{}], seqIdStart = {}, pageLink = {}",
 | 
			
		||||
                    tenantId, edge.getId(), seqIdStart, pageLink);
 | 
			
		||||
            PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, null, (TimePageLink) pageLink);
 | 
			
		||||
            if (edgeEvents.getData().isEmpty()) {
 | 
			
		||||
            if (!edgeEvents.getData().isEmpty()) {
 | 
			
		||||
                return edgeEvents;
 | 
			
		||||
            }
 | 
			
		||||
            if (seqIdStart > this.maxReadRecordsCount) {
 | 
			
		||||
                edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount), (TimePageLink) pageLink);
 | 
			
		||||
                if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) {
 | 
			
		||||
                    log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId());
 | 
			
		||||
                    this.seqIdNewCycleStarted = true;
 | 
			
		||||
                    this.seqIdStart = 0L;
 | 
			
		||||
                } else {
 | 
			
		||||
                    edgeEvents = new PageData<>();
 | 
			
		||||
                    log.warn("[{}] unexpected edge notification message received. " +
 | 
			
		||||
                            "no new events found and seqId column of edge_event table doesn't started new cycle [{}]", tenantId, edge.getId());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
                    return edgeEvents;
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] failed to find edge events [{}]", tenantId, edge.getId());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            log.info("[{}] Unexpected edge notification message received. " +
 | 
			
		||||
                    "No new events found, and the seqId column of the edge_event table has not started a new cycle [{}].", tenantId, edge.getId());
 | 
			
		||||
            return new PageData<>();
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.error("[{}] Failed to find edge events [{}]", tenantId, edge.getId(), e);
 | 
			
		||||
            return new PageData<>();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -120,7 +120,7 @@ public abstract class BaseEdgeProcessor {
 | 
			
		||||
    private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) {
 | 
			
		||||
        return switch (action) {
 | 
			
		||||
            case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, ADDED_COMMENT,
 | 
			
		||||
                 UPDATED_COMMENT -> true;
 | 
			
		||||
                 UPDATED_COMMENT, DELETED -> true;
 | 
			
		||||
            default -> switch (type) {
 | 
			
		||||
                case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE,
 | 
			
		||||
                     WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION, NOTIFICATION_TEMPLATE, NOTIFICATION_TARGET,
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,10 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor {
 | 
			
		||||
        if (dashboard == null) {
 | 
			
		||||
            throw new RuntimeException("[{" + tenantId + "}] dashboardUpdateMsg {" + dashboardUpdateMsg + "} cannot be converted to dashboard");
 | 
			
		||||
        }
 | 
			
		||||
        Set<ShortCustomerInfo> newAssignedCustomers = new HashSet<>(dashboard.getAssignedCustomers());
 | 
			
		||||
        Set<ShortCustomerInfo> newAssignedCustomers = new HashSet<>();
 | 
			
		||||
        if (dashboard.getAssignedCustomers() != null && !dashboard.getAssignedCustomers().isEmpty()) {
 | 
			
		||||
            newAssignedCustomers.addAll(dashboard.getAssignedCustomers());
 | 
			
		||||
        }
 | 
			
		||||
        Dashboard dashboardById = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId);
 | 
			
		||||
        if (dashboardById == null) {
 | 
			
		||||
            created = true;
 | 
			
		||||
 | 
			
		||||
@ -28,17 +28,22 @@ import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
 | 
			
		||||
 | 
			
		||||
public interface EdgeRequestsService {
 | 
			
		||||
 | 
			
		||||
    @Deprecated(since = "3.9.1", forRemoval = true)
 | 
			
		||||
    ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> processAttributesRequestMsg(TenantId tenantId, Edge edge, AttributesRequestMsg attributesRequestMsg);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Void> processRelationRequestMsg(TenantId tenantId, Edge edge, RelationRequestMsg relationRequestMsg);
 | 
			
		||||
 | 
			
		||||
    @Deprecated(since = "3.9.1", forRemoval = true)
 | 
			
		||||
    ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
 | 
			
		||||
 | 
			
		||||
    @Deprecated(since = "3.9.1", forRemoval = true)
 | 
			
		||||
    ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg);
 | 
			
		||||
 | 
			
		||||
    @Deprecated(since = "3.9.1", forRemoval = true)
 | 
			
		||||
    ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg);
 | 
			
		||||
 | 
			
		||||
    @Deprecated(since = "3.9.1", forRemoval = true)
 | 
			
		||||
    ListenableFuture<Void> processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -212,9 +212,9 @@ ui:
 | 
			
		||||
database:
 | 
			
		||||
  ts_max_intervals: "${DATABASE_TS_MAX_INTERVALS:700}" # Max number of DB queries generated by a single API call to fetch telemetry records
 | 
			
		||||
  ts:
 | 
			
		||||
    type: "${DATABASE_TS_TYPE:sql}" # cassandra or sql. timescale option is deprecated and will no longer be supported in ThingsBoard 4.0
 | 
			
		||||
    type: "${DATABASE_TS_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
 | 
			
		||||
  ts_latest:
 | 
			
		||||
    type: "${DATABASE_TS_LATEST_TYPE:sql}" # cassandra or sql. timescale option is deprecated and will no longer be supported in ThingsBoard 4.0
 | 
			
		||||
    type: "${DATABASE_TS_LATEST_TYPE:sql}" # cassandra, sql, or timescale (for hybrid mode, DATABASE_TS_TYPE value should be cassandra, or timescale)
 | 
			
		||||
 | 
			
		||||
# Cassandra driver configuration parameters
 | 
			
		||||
cassandra:
 | 
			
		||||
 | 
			
		||||
@ -17,22 +17,27 @@ package org.thingsboard.server.actors.device;
 | 
			
		||||
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
import org.mockito.Mockito;
 | 
			
		||||
import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.common.data.id.DeviceId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.dao.device.DeviceService;
 | 
			
		||||
import org.thingsboard.server.service.transport.TbCoreToTransportService;
 | 
			
		||||
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
import static org.hamcrest.CoreMatchers.instanceOf;
 | 
			
		||||
import static org.hamcrest.CoreMatchers.is;
 | 
			
		||||
import static org.hamcrest.CoreMatchers.notNullValue;
 | 
			
		||||
import static org.hamcrest.MatcherAssert.assertThat;
 | 
			
		||||
import static org.mockito.BDDMockito.willReturn;
 | 
			
		||||
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 | 
			
		||||
import static org.mockito.Mockito.mock;
 | 
			
		||||
 | 
			
		||||
public class DeviceActorMessageProcessorTest {
 | 
			
		||||
 | 
			
		||||
    public static final long MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10L;
 | 
			
		||||
    public static final int MAX_CONCURRENT_SESSIONS_PER_DEVICE = 10;
 | 
			
		||||
    ActorSystemContext systemContext;
 | 
			
		||||
    DeviceService deviceService;
 | 
			
		||||
    TenantId tenantId = TenantId.SYS_TENANT_ID;
 | 
			
		||||
@ -44,15 +49,38 @@ public class DeviceActorMessageProcessorTest {
 | 
			
		||||
    public void setUp() {
 | 
			
		||||
        systemContext = mock(ActorSystemContext.class);
 | 
			
		||||
        deviceService = mock(DeviceService.class);
 | 
			
		||||
        willReturn(MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice();
 | 
			
		||||
        willReturn((long)MAX_CONCURRENT_SESSIONS_PER_DEVICE).given(systemContext).getMaxConcurrentSessionsPerDevice();
 | 
			
		||||
        willReturn(deviceService).given(systemContext).getDeviceService();
 | 
			
		||||
        processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
 | 
			
		||||
        willReturn(mock(TbCoreToTransportService.class)).given(systemContext).getTbCoreToTransportService();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenSystemContext_whenNewInstance_thenVerifySessionMapMaxSize() {
 | 
			
		||||
        assertThat(processor.sessions, instanceOf(LinkedHashMapRemoveEldest.class));
 | 
			
		||||
        assertThat(processor.sessions.getMaxEntries(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
        assertThat(processor.sessions.getMaxEntries(), is((long)MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
        assertThat(processor.sessions.getRemovalConsumer(), notNullValue());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void givenFullSessionMap_whenSessionOverflow_thenShouldDeleteAttributeAndRPCSubscriptions() {
 | 
			
		||||
        //givenFullSessionMap
 | 
			
		||||
        for (int i = 0; i < MAX_CONCURRENT_SESSIONS_PER_DEVICE; i++) {
 | 
			
		||||
            UUID sessionID = UUID.randomUUID();
 | 
			
		||||
            processor.sessions.put(sessionID, Mockito.mock(SessionInfoMetaData.class, RETURNS_DEEP_STUBS));
 | 
			
		||||
            processor.attributeSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class));
 | 
			
		||||
            processor.rpcSubscriptions.put(sessionID, Mockito.mock(SessionInfo.class));
 | 
			
		||||
        }
 | 
			
		||||
        assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
        assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
        assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
 | 
			
		||||
        //add one more
 | 
			
		||||
        processor.sessions.put(UUID.randomUUID(), Mockito.mock(SessionInfoMetaData.class));
 | 
			
		||||
 | 
			
		||||
        assertThat(processor.sessions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE));
 | 
			
		||||
        assertThat(processor.attributeSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1));
 | 
			
		||||
        assertThat(processor.rpcSubscriptions.size(), is(MAX_CONCURRENT_SESSIONS_PER_DEVICE-1));
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -897,7 +897,7 @@ public class EdgeControllerTest extends AbstractControllerTest {
 | 
			
		||||
        edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class);
 | 
			
		||||
        edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class);
 | 
			
		||||
 | 
			
		||||
        edgeImitator.expectMessageAmount(26);
 | 
			
		||||
        edgeImitator.expectMessageAmount(27);
 | 
			
		||||
        edgeImitator.connect();
 | 
			
		||||
        waitForMessages(edgeImitator);
 | 
			
		||||
 | 
			
		||||
@ -1003,6 +1003,7 @@ public class EdgeControllerTest extends AbstractControllerTest {
 | 
			
		||||
        Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "connectivity"));
 | 
			
		||||
        Assert.assertTrue(popAdminSettingsMsg(edgeImitator.getDownlinkMsgs(), "jwt"));
 | 
			
		||||
        Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
 | 
			
		||||
        Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
 | 
			
		||||
        Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
 | 
			
		||||
        Assert.assertTrue(popDeviceProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
 | 
			
		||||
        Assert.assertTrue(popAssetProfileMsg(edgeImitator.getDownlinkMsgs(), UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, "default"));
 | 
			
		||||
 | 
			
		||||
@ -144,7 +144,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
 | 
			
		||||
        edgeImitator = new EdgeImitator("localhost", 7070, edge.getRoutingKey(), edge.getSecret());
 | 
			
		||||
        edgeImitator.ignoreType(OAuth2ClientUpdateMsg.class);
 | 
			
		||||
        edgeImitator.ignoreType(OAuth2DomainUpdateMsg.class);
 | 
			
		||||
        edgeImitator.expectMessageAmount(24);
 | 
			
		||||
        edgeImitator.expectMessageAmount(26);
 | 
			
		||||
        edgeImitator.connect();
 | 
			
		||||
 | 
			
		||||
        requestEdgeRuleChainMetadata();
 | 
			
		||||
@ -265,10 +265,10 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
        // 4 messages
 | 
			
		||||
        // - 1 from default profile fetcher
 | 
			
		||||
        // - 2 from device profile fetcher (default and thermostat)
 | 
			
		||||
        // - 4 from device profile fetcher (2 * (default and thermostat) before and after ota packages fetcher
 | 
			
		||||
        // - 1 from device fetcher
 | 
			
		||||
        validateMsgsCnt(DeviceProfileUpdateMsg.class, 4);
 | 
			
		||||
        validateDeviceProfiles(4);
 | 
			
		||||
        validateMsgsCnt(DeviceProfileUpdateMsg.class, 6);
 | 
			
		||||
        validateDeviceProfiles(6);
 | 
			
		||||
 | 
			
		||||
        // 3 messages
 | 
			
		||||
        // - 1 from default profile fetcher
 | 
			
		||||
@ -656,7 +656,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected RuleChainId createEdgeRuleChainAndAssignToEdge(String ruleChainName) throws Exception {
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
        RuleChain ruleChain = new RuleChain();
 | 
			
		||||
        ruleChain.setName(ruleChainName);
 | 
			
		||||
        ruleChain.setType(RuleChainType.EDGE);
 | 
			
		||||
 | 
			
		||||
@ -48,12 +48,14 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
    private static final int MOBILE_ORDER = 5;
 | 
			
		||||
    private static final String IMAGE = "";
 | 
			
		||||
 | 
			
		||||
    private static final String DASHBOARD_TITLE = "Edge Test Dashboard";
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDashboards() throws Exception {
 | 
			
		||||
        // create dashboard and assign to edge
 | 
			
		||||
        edgeImitator.expectMessageAmount(2);
 | 
			
		||||
        Dashboard dashboard = new Dashboard();
 | 
			
		||||
        dashboard.setTitle("Edge Test Dashboard");
 | 
			
		||||
        dashboard.setTitle(DASHBOARD_TITLE);
 | 
			
		||||
        dashboard.setMobileHide(true);
 | 
			
		||||
        dashboard.setImage(IMAGE);
 | 
			
		||||
        dashboard.setMobileOrder(MOBILE_ORDER);
 | 
			
		||||
@ -200,12 +202,27 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
 | 
			
		||||
        Dashboard foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class);
 | 
			
		||||
        Assert.assertNotNull(foundDashboard);
 | 
			
		||||
        Assert.assertEquals("Edge Test Dashboard", foundDashboard.getName());
 | 
			
		||||
        Assert.assertEquals(DASHBOARD_TITLE, foundDashboard.getName());
 | 
			
		||||
 | 
			
		||||
        PageData<DashboardInfo> pageData = doGetTypedWithPageLink("/api/customer/" + savedCustomer.getId().toString() + "/dashboards?",
 | 
			
		||||
                new TypeReference<>() {}, new PageLink(100));
 | 
			
		||||
        Assert.assertEquals(1, pageData.getData().size());
 | 
			
		||||
        Assert.assertEquals("Edge Test Dashboard", pageData.getData().get(0).getTitle());
 | 
			
		||||
        Assert.assertEquals(DASHBOARD_TITLE, pageData.getData().get(0).getTitle());
 | 
			
		||||
 | 
			
		||||
        dashboard.setTitle(DASHBOARD_TITLE + " Updated");
 | 
			
		||||
        dashboard.setAssignedCustomers(null);
 | 
			
		||||
        dashboardUpdateMsgBuilder.setEntity(JacksonUtil.toString(dashboard));
 | 
			
		||||
        dashboardUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
 | 
			
		||||
        uplinkMsgBuilder = UplinkMsg.newBuilder();
 | 
			
		||||
        uplinkMsgBuilder.addDashboardUpdateMsg(dashboardUpdateMsgBuilder.build());
 | 
			
		||||
 | 
			
		||||
        edgeImitator.expectResponsesAmount(1);
 | 
			
		||||
        edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
 | 
			
		||||
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForResponses());
 | 
			
		||||
 | 
			
		||||
        foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class);
 | 
			
		||||
        Assert.assertEquals(DASHBOARD_TITLE + " Updated", foundDashboard.getName());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -256,7 +273,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        Dashboard dashboard = new Dashboard();
 | 
			
		||||
        dashboard.setId(new DashboardId(UUID.randomUUID()));
 | 
			
		||||
        dashboard.setTenantId(tenantId);
 | 
			
		||||
        dashboard.setTitle("Edge Test Dashboard");
 | 
			
		||||
        dashboard.setTitle(DASHBOARD_TITLE);
 | 
			
		||||
        dashboard.setAssignedCustomers(Sets.newHashSet(new ShortCustomerInfo(savedCustomer.getId(), savedCustomer.getTitle(), savedCustomer.isPublic())));
 | 
			
		||||
        return dashboard;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -126,6 +126,45 @@ public class DeviceProfileEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        unAssignFromEdgeAndDeleteDashboard(thermostatsDashboardId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDeleteDeviceProfilesWhenEdgeIsOffline() throws Exception {
 | 
			
		||||
        RuleChainId thermostatsRuleChainId = createEdgeRuleChainAndAssignToEdge("Thermostats Rule Chain");
 | 
			
		||||
 | 
			
		||||
        // create device profile
 | 
			
		||||
        DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null);
 | 
			
		||||
        deviceProfile.setDefaultEdgeRuleChainId(thermostatsRuleChainId);
 | 
			
		||||
        extendDeviceProfileData(deviceProfile);
 | 
			
		||||
        edgeImitator.expectMessageAmount(1);
 | 
			
		||||
        deviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
        AbstractMessage latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        Assert.assertTrue(latestMessage instanceof DeviceProfileUpdateMsg);
 | 
			
		||||
        DeviceProfileUpdateMsg deviceProfileUpdateMsg = (DeviceProfileUpdateMsg) latestMessage;
 | 
			
		||||
        DeviceProfile deviceProfileMsg = JacksonUtil.fromString(deviceProfileUpdateMsg.getEntity(), DeviceProfile.class, true);
 | 
			
		||||
        Assert.assertNotNull(deviceProfileMsg);
 | 
			
		||||
        Assert.assertEquals(deviceProfile, deviceProfileMsg);
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType());
 | 
			
		||||
 | 
			
		||||
        // delete profile when edge is offline
 | 
			
		||||
        edgeImitator.disconnect();
 | 
			
		||||
        doDelete("/api/deviceProfile/" + deviceProfile.getUuidId())
 | 
			
		||||
                .andExpect(status().isOk());
 | 
			
		||||
        edgeImitator.connect();
 | 
			
		||||
        // 27 sync message
 | 
			
		||||
        // + 1 delete message
 | 
			
		||||
        edgeImitator.expectMessageAmount(28);
 | 
			
		||||
        Assert.assertTrue(edgeImitator.waitForMessages());
 | 
			
		||||
 | 
			
		||||
        latestMessage = edgeImitator.getLatestMessage();
 | 
			
		||||
        Assert.assertTrue(latestMessage instanceof DeviceProfileUpdateMsg);
 | 
			
		||||
        deviceProfileUpdateMsg = (DeviceProfileUpdateMsg) latestMessage;
 | 
			
		||||
        Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType());
 | 
			
		||||
        Assert.assertEquals(deviceProfile.getUuidId().getMostSignificantBits(), deviceProfileUpdateMsg.getIdMSB());
 | 
			
		||||
        Assert.assertEquals(deviceProfile.getUuidId().getLeastSignificantBits(), deviceProfileUpdateMsg.getIdLSB());
 | 
			
		||||
 | 
			
		||||
        unAssignFromEdgeAndDeleteRuleChain(thermostatsRuleChainId);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testDeviceProfiles_snmp() throws Exception {
 | 
			
		||||
        DeviceProfile deviceProfile = createDeviceProfileAndDoBasicAssert("SNMP", createSnmpDeviceProfileTransportConfiguration());
 | 
			
		||||
@ -449,4 +488,5 @@ public class DeviceProfileEdgeTest extends AbstractEdgeTest {
 | 
			
		||||
        deviceProfile.setProfileData(createProfileData());
 | 
			
		||||
        return deviceProfile;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -96,6 +96,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
			
		||||
    private final TbQueueAdmin edgeEventAdmin;
 | 
			
		||||
 | 
			
		||||
    private final AtomicLong consumerCount = new AtomicLong();
 | 
			
		||||
    private final AtomicLong edgeConsumerCount = new AtomicLong();
 | 
			
		||||
 | 
			
		||||
    public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
 | 
			
		||||
                                     TbServiceInfoProvider serviceInfoProvider,
 | 
			
		||||
@ -472,7 +473,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
 | 
			
		||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
 | 
			
		||||
        consumerBuilder.settings(kafkaSettings);
 | 
			
		||||
        consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
 | 
			
		||||
        consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId());
 | 
			
		||||
        consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
 | 
			
		||||
        consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
 | 
			
		||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        consumerBuilder.admin(edgeEventAdmin);
 | 
			
		||||
 | 
			
		||||
@ -95,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
    private final TbQueueAdmin edgeEventAdmin;
 | 
			
		||||
 | 
			
		||||
    private final AtomicLong consumerCount = new AtomicLong();
 | 
			
		||||
    private final AtomicLong edgeConsumerCount = new AtomicLong();
 | 
			
		||||
 | 
			
		||||
    public KafkaTbCoreQueueFactory(TopicService topicService,
 | 
			
		||||
                                   TbKafkaSettings kafkaSettings,
 | 
			
		||||
@ -421,7 +422,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
 | 
			
		||||
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
 | 
			
		||||
        consumerBuilder.settings(kafkaSettings);
 | 
			
		||||
        consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
 | 
			
		||||
        consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId());
 | 
			
		||||
        consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + edgeConsumerCount.incrementAndGet());
 | 
			
		||||
        consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
 | 
			
		||||
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
 | 
			
		||||
        consumerBuilder.admin(edgeEventAdmin);
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@
 | 
			
		||||
        </tb-color-input>
 | 
			
		||||
      </div>
 | 
			
		||||
    </div>
 | 
			
		||||
    <div class="tb-form-row space-between flex-1 flex-col">
 | 
			
		||||
    <div class="tb-form-row space-between flex-1 !flex-col">
 | 
			
		||||
      <div class="tb-flex row space-between align-center no-gap fill-width">
 | 
			
		||||
        <div class="fixed-title-width"  translate>widgets.liquid-level-card.shape</div>
 | 
			
		||||
        <tb-toggle-select formControlName="tankSelectionType">
 | 
			
		||||
 | 
			
		||||
@ -18,7 +18,7 @@
 | 
			
		||||
<ng-container *ngIf="levelCardWidgetSettingsForm"  [formGroup]="levelCardWidgetSettingsForm">
 | 
			
		||||
  <div class="tb-form-panel no-padding no-border">
 | 
			
		||||
    <div class="tb-form-panel">
 | 
			
		||||
      <div class="tb-form-row space-between flex flex-1 flex-col">
 | 
			
		||||
      <div class="tb-form-row space-between flex flex-1 !flex-col">
 | 
			
		||||
        <div class="tb-flex row space-between align-center no-gap fill-width">
 | 
			
		||||
          <div class="fixed-title-width" translate>widgets.liquid-level-card.shape</div>
 | 
			
		||||
          <tb-toggle-select formControlName="tankSelectionType">
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user