diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 1453f1a85c..75bb8e10e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -498,9 +498,9 @@ public final class EdgeGrpcSession implements Closeable { log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", this.tenantId, this.sessionId, copy.size()); for (DownlinkMsg downlinkMsg : copy) { if (this.clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > this.clientMaxInboundMessageSize) { - String error = String.format("Client max inbound message size [{%s}] is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + + String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + "env variable on the edge and restart it.", this.clientMaxInboundMessageSize); - String message = String.format("Downlink msg size [{%s}] exceeds client max inbound message size [{%s}]. " + + String message = String.format("Downlink msg size %s exceeds client max inbound message size %s. " + "Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the edge and restart it.", downlinkMsg.getSerializedSize(), this.clientMaxInboundMessageSize); log.error("[{}][{}][{}] {} Message {}", this.tenantId, edge.getId(), this.sessionId, message, downlinkMsg); ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) @@ -551,35 +551,14 @@ public final class EdgeGrpcSession implements Closeable { DownlinkMsg downlinkMsg = null; try { switch (edgeEvent.getAction()) { - case UPDATED: - case ADDED: - case DELETED: - case ASSIGNED_TO_EDGE: - case UNASSIGNED_FROM_EDGE: - case ALARM_ACK: - case ALARM_CLEAR: - case ALARM_DELETE: - case CREDENTIALS_UPDATED: - case RELATION_ADD_OR_UPDATE: - case RELATION_DELETED: - case CREDENTIALS_REQUEST: - case RPC_CALL: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - case ADDED_COMMENT: - case UPDATED_COMMENT: - case DELETED_COMMENT: + case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, CREDENTIALS_REQUEST, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); - break; - case ATTRIBUTES_UPDATED: - case POST_ATTRIBUTES: - case ATTRIBUTES_DELETED: - case TIMESERIES_UPDATED: - downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent); - break; - default: - log.warn("[{}][{}] Unsupported action type [{}]", this.tenantId, this.sessionId, edgeEvent.getAction()); + } + case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> + downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); + default -> + log.warn("[{}][{}] Unsupported action type [{}]", this.tenantId, this.sessionId, edgeEvent.getAction()); } } catch (Exception e) { log.error("[{}][{}] Exception during converting edge event to downlink msg", this.tenantId, this.sessionId, e); @@ -857,7 +836,7 @@ public final class EdgeGrpcSession implements Closeable { .build(); } String error = "Failed to validate the edge!"; - String failureMsg = String.format("{%s} Provided request secret: %s", error, request.getEdgeSecret()); + String failureMsg = String.format("%s Provided request secret: %s", error, request.getEdgeSecret()); ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); return ConnectResponseMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index cc86826d41..db0bbd8aa5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -25,7 +25,6 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.Dashboard; -import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; @@ -93,7 +92,6 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.service.entitiy.TbLogEntityActionService; import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.asset.AssetMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.customer.CustomerMsgConstructorFactory; @@ -115,6 +113,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.widget.WidgetMsgConst import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessorFactory; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessorFactory; import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewProcessorFactory; +import org.thingsboard.server.service.entitiy.TbLogEntityActionService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -356,35 +355,15 @@ public abstract class BaseEdgeProcessor { private boolean doSaveIfEdgeIsOffline(EdgeEventType type, EdgeEventActionType action) { - switch (action) { - case TIMESERIES_UPDATED: - case ALARM_ACK: - case ALARM_CLEAR: - case ALARM_ASSIGNED: - case ALARM_UNASSIGNED: - case CREDENTIALS_REQUEST: - case ADDED_COMMENT: - case UPDATED_COMMENT: - return true; - } - switch (type) { - case ALARM: - case ALARM_COMMENT: - case RULE_CHAIN: - case RULE_CHAIN_METADATA: - case USER: - case CUSTOMER: - case TENANT: - case TENANT_PROFILE: - case WIDGETS_BUNDLE: - case WIDGET_TYPE: - case ADMIN_SETTINGS: - case OTA_PACKAGE: - case QUEUE: - case RELATION: - return true; - } - return false; + return switch (action) { + case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, CREDENTIALS_REQUEST, ADDED_COMMENT, UPDATED_COMMENT -> + true; + default -> switch (type) { + case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION -> + true; + default -> false; + }; + }; } private ListenableFuture doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) { @@ -435,31 +414,17 @@ public abstract class BaseEdgeProcessor { } protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) { - switch (actionType) { - case UPDATED: - case CREDENTIALS_UPDATED: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - case UPDATED_COMMENT: - return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; - case ADDED: - case ASSIGNED_TO_EDGE: - case RELATION_ADD_OR_UPDATE: - case ADDED_COMMENT: - return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; - case DELETED: - case UNASSIGNED_FROM_EDGE: - case RELATION_DELETED: - case DELETED_COMMENT: - case ALARM_DELETE: - return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; - case ALARM_ACK: - return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; - case ALARM_CLEAR: - return UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE; - default: - throw new RuntimeException("Unsupported actionType [" + actionType + "]"); - } + return switch (actionType) { + case UPDATED, CREDENTIALS_UPDATED, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, UPDATED_COMMENT -> + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; + case ADDED, ASSIGNED_TO_EDGE, RELATION_ADD_OR_UPDATE, ADDED_COMMENT -> + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; + case DELETED, UNASSIGNED_FROM_EDGE, RELATION_DELETED, DELETED_COMMENT, ALARM_DELETE -> + UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; + case ALARM_ACK -> UpdateMsgType.ALARM_ACK_RPC_MESSAGE; + case ALARM_CLEAR -> UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE; + default -> throw new RuntimeException("Unsupported actionType [" + actionType + "]"); + }; } public ListenableFuture processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { @@ -554,15 +519,11 @@ public abstract class BaseEdgeProcessor { } private ListenableFuture processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { - switch (actionType) { - case ADDED: - case UPDATED: - case DELETED: - case CREDENTIALS_UPDATED: // used by USER entity - return processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); - default: - return Futures.immediateFuture(null); - } + return switch (actionType) { + case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity + processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId); + default -> Futures.immediateFuture(null); + }; } protected EntityId constructEntityId(String entityTypeStr, long entityIdMSB, long entityIdLSB) { @@ -605,26 +566,18 @@ public abstract class BaseEdgeProcessor { } protected boolean isEntityExists(TenantId tenantId, EntityId entityId) { - switch (entityId.getEntityType()) { - case TENANT: - return tenantService.findTenantById(tenantId) != null; - case DEVICE: - return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; - case ASSET: - return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; - case ENTITY_VIEW: - return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; - case CUSTOMER: - return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; - case USER: - return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; - case DASHBOARD: - return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; - case EDGE: - return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; - default: - return false; - } + return switch (entityId.getEntityType()) { + case TENANT -> tenantService.findTenantById(tenantId) != null; + case DEVICE -> deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; + case ASSET -> assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; + case ENTITY_VIEW -> + entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; + case CUSTOMER -> customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; + case USER -> userService.findUserById(tenantId, new UserId(entityId.getId())) != null; + case DASHBOARD -> dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; + case EDGE -> edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; + default -> false; + }; } protected void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) { @@ -663,37 +616,25 @@ public abstract class BaseEdgeProcessor { } protected AssetProfile checkIfAssetProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, AssetProfile assetProfile, EdgeVersion edgeVersion) { - switch (edgeVersion) { - case V_3_3_3: - case V_3_3_0: - case V_3_4_0: - if (assetProfile.getDefaultDashboardId() != null - && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) { - assetProfile.setDefaultDashboardId(null); - } - if (assetProfile.getDefaultEdgeRuleChainId() != null - && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) { - assetProfile.setDefaultEdgeRuleChainId(null); - } - break; + if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) { + if (assetProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) { + assetProfile.setDefaultDashboardId(null); + } + if (assetProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) { + assetProfile.setDefaultEdgeRuleChainId(null); + } } return assetProfile; } protected DeviceProfile checkIfDeviceProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, DeviceProfile deviceProfile, EdgeVersion edgeVersion) { - switch (edgeVersion) { - case V_3_3_3: - case V_3_3_0: - case V_3_4_0: - if (deviceProfile.getDefaultDashboardId() != null - && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) { - deviceProfile.setDefaultDashboardId(null); - } - if (deviceProfile.getDefaultEdgeRuleChainId() != null - && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) { - deviceProfile.setDefaultEdgeRuleChainId(null); - } - break; + if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) { + if (deviceProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) { + deviceProfile.setDefaultDashboardId(null); + } + if (deviceProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) { + deviceProfile.setDefaultEdgeRuleChainId(null); + } } return deviceProfile; } @@ -708,4 +649,5 @@ public abstract class BaseEdgeProcessor { } return true; } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java index e69c479d93..4e9a0b82d8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java @@ -16,12 +16,18 @@ package org.thingsboard.server.service.edge.rpc.processor.telemetry; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -31,18 +37,28 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @TbCoreComponent public class TelemetryEdgeProcessor extends BaseTelemetryProcessor { + @Value("${edges.rpc.max_telemetry_message_size:0}") + private int maxTelemetryMessageSize; + + @Lazy + @Autowired + private NotificationRuleProcessor notificationRuleProcessor; + @Override protected String getMsgSourceKey() { return DataConstants.EDGE_MSG_SOURCE; } - public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) { + public DownlinkMsg convertTelemetryEventToDownlink(Edge edge, EdgeEvent edgeEvent) { if (edgeEvent.getBody() != null) { String bodyStr = edgeEvent.getBody().toString(); - if (bodyStr.length() > 1000) { - log.debug("[{}][{}][{}] Conversion to a DownlinkMsg telemetry event failed due to a size limit violation. " + - "Current size is {}, but the limit is 1000. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(), - edgeEvent.getEntityId(), bodyStr.length(), StringUtils.truncate(bodyStr, 100)); + if (maxTelemetryMessageSize > 0 && bodyStr.length() > maxTelemetryMessageSize) { + String error = "Conversion to a DownlinkMsg telemetry event failed due to a size limit violation."; + String message = String.format("%s Current size is %s, but the limit is %s", error, bodyStr.length(), maxTelemetryMessageSize); + log.debug("[{}][{}][{}] {}. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(), + edgeEvent.getEntityId(), message, StringUtils.truncate(bodyStr, 100)); + notificationRuleProcessor.process(EdgeCommunicationFailureTrigger.builder().tenantId(edgeEvent.getTenantId()) + .edgeId(edgeEvent.getEdgeId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(message).error(error).build()); return null; } } @@ -55,4 +71,5 @@ public class TelemetryEdgeProcessor extends BaseTelemetryProcessor { .addEntityData(entityDataProto) .build(); } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/tenant/TenantEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/tenant/TenantEdgeProcessor.java index 2c192fab1d..515dccb987 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/tenant/TenantEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/tenant/TenantEdgeProcessor.java @@ -60,4 +60,5 @@ public class TenantEdgeProcessor extends BaseEdgeProcessor { } return downlinkMsg; } + } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index daf9f5d1c1..7616fa7805 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1304,6 +1304,8 @@ edges: private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}" # Maximum size (in bytes) of inbound messages the cloud can handle from the edge. By default, it can handle messages up to 4 Megabytes max_inbound_message_size: "${EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE:4194304}" + # Maximum length of telemetry (time-series and attributes) message the cloud sends to the edge. By default, there is no limitation. + max_telemetry_message_size: "${EDGES_RPC_MAX_TELEMETRY_MESSAGE_SIZE:300}" storage: # Max records of edge event to read from DB and sent to the edge max_read_records_count: "${EDGES_STORAGE_MAX_READ_RECORDS_COUNT:50}" diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 4e5b080b65..f3ed8e12e6 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -526,17 +526,18 @@ public abstract class BaseEdgeProcessorTest { } protected static Stream provideParameters() { - UUID dashoboardUUID = UUID.randomUUID(); - UUID ruleChaindUUID = UUID.randomUUID(); + UUID dashboardUUID = UUID.randomUUID(); + UUID ruleChainUUID = UUID.randomUUID(); return Stream.of( Arguments.of(EdgeVersion.V_3_3_0, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_3_3, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_4_0, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_6_0, - dashoboardUUID.getMostSignificantBits(), - dashoboardUUID.getLeastSignificantBits(), - ruleChaindUUID.getMostSignificantBits(), - ruleChaindUUID.getLeastSignificantBits()) + dashboardUUID.getMostSignificantBits(), + dashboardUUID.getLeastSignificantBits(), + ruleChainUUID.getMostSignificantBits(), + ruleChainUUID.getLeastSignificantBits()) ); } + } diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessorTest.java index a6e28e7968..94714af463 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessorTest.java @@ -16,27 +16,51 @@ package org.thingsboard.server.service.edge.rpc.processor.telemetry; import com.fasterxml.jackson.databind.node.ObjectNode; -import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.Mockito; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessorTest; -@Slf4j -@RunWith(MockitoJUnitRunner.class) -public class TelemetryEdgeProcessorTest { +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; + +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = {TelemetryEdgeProcessor.class}) +@TestPropertySource(properties = { + "edges.rpc.max_telemetry_message_size=1000" +}) +public class TelemetryEdgeProcessorTest extends BaseEdgeProcessorTest { + + @SpyBean + private TelemetryEdgeProcessor telemetryEdgeProcessor; + + @MockBean + private NotificationRuleProcessor notificationRuleProcessor; @Test - public void testConvert_maxSizeLimit() throws Exception { + public void testConvert_maxSizeLimit() { + Edge edge = new Edge(); EdgeEvent edgeEvent = new EdgeEvent(); ObjectNode body = JacksonUtil.newObjectNode(); - body.put("value", StringUtils.randomAlphanumeric(10000)); + body.put("value", StringUtils.randomAlphanumeric(1000)); edgeEvent.setBody(body); - DownlinkMsg downlinkMsg = new TelemetryEdgeProcessor().convertTelemetryEventToDownlink(edgeEvent); + + DownlinkMsg downlinkMsg = telemetryEdgeProcessor.convertTelemetryEventToDownlink(edge, edgeEvent); Assert.assertNull(downlinkMsg); + + verify(notificationRuleProcessor, Mockito.times(1)).process(any()); } + } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index ca6e826cac..38f84bb4a1 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -434,36 +434,28 @@ public class ProtoUtils { } public static TransportProtos.ToDeviceActorNotificationMsgProto toProto(ToDeviceActorNotificationMsg msg) { - if (msg instanceof DeviceEdgeUpdateMsg) { - DeviceEdgeUpdateMsg updateMsg = (DeviceEdgeUpdateMsg) msg; + if (msg instanceof DeviceEdgeUpdateMsg updateMsg) { TransportProtos.DeviceEdgeUpdateMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(proto).build(); - } else if (msg instanceof DeviceNameOrTypeUpdateMsg) { - DeviceNameOrTypeUpdateMsg updateMsg = (DeviceNameOrTypeUpdateMsg) msg; + } else if (msg instanceof DeviceNameOrTypeUpdateMsg updateMsg) { TransportProtos.DeviceNameOrTypeUpdateMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(proto).build(); - } else if (msg instanceof DeviceAttributesEventNotificationMsg) { - DeviceAttributesEventNotificationMsg updateMsg = (DeviceAttributesEventNotificationMsg) msg; + } else if (msg instanceof DeviceAttributesEventNotificationMsg updateMsg) { TransportProtos.DeviceAttributesEventMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(proto).build(); - } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg) { - DeviceCredentialsUpdateNotificationMsg updateMsg = (DeviceCredentialsUpdateNotificationMsg) msg; + } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg updateMsg) { TransportProtos.DeviceCredentialsUpdateMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(proto).build(); - } else if (msg instanceof ToDeviceRpcRequestActorMsg) { - ToDeviceRpcRequestActorMsg updateMsg = (ToDeviceRpcRequestActorMsg) msg; + } else if (msg instanceof ToDeviceRpcRequestActorMsg updateMsg) { TransportProtos.ToDeviceRpcRequestActorMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(proto).build(); - } else if (msg instanceof FromDeviceRpcResponseActorMsg) { - FromDeviceRpcResponseActorMsg updateMsg = (FromDeviceRpcResponseActorMsg) msg; + } else if (msg instanceof FromDeviceRpcResponseActorMsg updateMsg) { TransportProtos.FromDeviceRpcResponseActorMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(proto).build(); - } else if (msg instanceof RemoveRpcActorMsg) { - RemoveRpcActorMsg updateMsg = (RemoveRpcActorMsg) msg; + } else if (msg instanceof RemoveRpcActorMsg updateMsg) { TransportProtos.RemoveRpcActorMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(proto).build(); - } else if (msg instanceof DeviceDeleteMsg) { - DeviceDeleteMsg updateMsg = (DeviceDeleteMsg) msg; + } else if (msg instanceof DeviceDeleteMsg updateMsg) { TransportProtos.DeviceDeleteMsgProto proto = toProto(updateMsg); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceDeleteMsg(proto).build(); } @@ -507,24 +499,14 @@ public class ProtoUtils { List result = new ArrayList<>(); for (TransportProtos.AttributeValueProto kvEntry : valuesList) { boolean hasValue = kvEntry.getHasV(); - KvEntry entry = null; - switch (kvEntry.getType()) { - case BOOLEAN_V: - entry = new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null); - break; - case LONG_V: - entry = new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null); - break; - case DOUBLE_V: - entry = new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null); - break; - case STRING_V: - entry = new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null); - break; - case JSON_V: - entry = new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null); - break; - } + KvEntry entry = switch (kvEntry.getType()) { + case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null); + case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null); + case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null); + case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null); + case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null); + default -> null; + }; result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry)); } return result; @@ -1029,15 +1011,11 @@ public class ProtoUtils { .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()) .setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo())); - PowerSavingConfiguration psmConfiguration = null; - switch (device.getDeviceData().getTransportConfiguration().getType()) { - case LWM2M: - psmConfiguration = (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration(); - break; - case COAP: - psmConfiguration = (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration(); - break; - } + PowerSavingConfiguration psmConfiguration = switch (device.getDeviceData().getTransportConfiguration().getType()) { + case LWM2M -> (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration(); + case COAP -> (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration(); + default -> null; + }; if (psmConfiguration != null) { PowerMode powerMode = psmConfiguration.getPowerMode(); @@ -1079,4 +1057,5 @@ public class ProtoUtils { private static Long checkLong(Long l) { return isNotNull(l) ? l : 0; } + }