Merge pull request #10471 from AndriiLandiak/fix/edge-telemetry-violation-msg

Yaml config for max size of telemetry message sent to Edge
This commit is contained in:
Volodymyr Babak 2024-04-01 15:53:45 +03:00 committed by GitHub
commit d338f298a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 148 additions and 203 deletions

View File

@ -498,9 +498,9 @@ public final class EdgeGrpcSession implements Closeable {
log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", this.tenantId, this.sessionId, copy.size()); log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", this.tenantId, this.sessionId, copy.size());
for (DownlinkMsg downlinkMsg : copy) { for (DownlinkMsg downlinkMsg : copy) {
if (this.clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > this.clientMaxInboundMessageSize) { if (this.clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > this.clientMaxInboundMessageSize) {
String error = String.format("Client max inbound message size [{%s}] is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " +
"env variable on the edge and restart it.", this.clientMaxInboundMessageSize); "env variable on the edge and restart it.", this.clientMaxInboundMessageSize);
String message = String.format("Downlink msg size [{%s}] exceeds client max inbound message size [{%s}]. " + String message = String.format("Downlink msg size %s exceeds client max inbound message size %s. " +
"Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the edge and restart it.", downlinkMsg.getSerializedSize(), this.clientMaxInboundMessageSize); "Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the edge and restart it.", downlinkMsg.getSerializedSize(), this.clientMaxInboundMessageSize);
log.error("[{}][{}][{}] {} Message {}", this.tenantId, edge.getId(), this.sessionId, message, downlinkMsg); log.error("[{}][{}][{}] {} Message {}", this.tenantId, edge.getId(), this.sessionId, message, downlinkMsg);
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
@ -551,35 +551,14 @@ public final class EdgeGrpcSession implements Closeable {
DownlinkMsg downlinkMsg = null; DownlinkMsg downlinkMsg = null;
try { try {
switch (edgeEvent.getAction()) { switch (edgeEvent.getAction()) {
case UPDATED: case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, CREDENTIALS_REQUEST, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
case ADDED:
case DELETED:
case ASSIGNED_TO_EDGE:
case UNASSIGNED_FROM_EDGE:
case ALARM_ACK:
case ALARM_CLEAR:
case ALARM_DELETE:
case CREDENTIALS_UPDATED:
case RELATION_ADD_OR_UPDATE:
case RELATION_DELETED:
case CREDENTIALS_REQUEST:
case RPC_CALL:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
case ADDED_COMMENT:
case UPDATED_COMMENT:
case DELETED_COMMENT:
downlinkMsg = convertEntityEventToDownlink(edgeEvent); downlinkMsg = convertEntityEventToDownlink(edgeEvent);
log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg);
break; }
case ATTRIBUTES_UPDATED: case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
case POST_ATTRIBUTES: downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
case ATTRIBUTES_DELETED: default ->
case TIMESERIES_UPDATED: log.warn("[{}][{}] Unsupported action type [{}]", this.tenantId, this.sessionId, edgeEvent.getAction());
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
break;
default:
log.warn("[{}][{}] Unsupported action type [{}]", this.tenantId, this.sessionId, edgeEvent.getAction());
} }
} catch (Exception e) { } catch (Exception e) {
log.error("[{}][{}] Exception during converting edge event to downlink msg", this.tenantId, this.sessionId, e); log.error("[{}][{}] Exception during converting edge event to downlink msg", this.tenantId, this.sessionId, e);
@ -857,7 +836,7 @@ public final class EdgeGrpcSession implements Closeable {
.build(); .build();
} }
String error = "Failed to validate the edge!"; String error = "Failed to validate the edge!";
String failureMsg = String.format("{%s} Provided request secret: %s", error, request.getEdgeSecret()); String failureMsg = String.format("%s Provided request secret: %s", error, request.getEdgeSecret());
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build());
return ConnectResponseMsg.newBuilder() return ConnectResponseMsg.newBuilder()

View File

@ -25,7 +25,6 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
@ -93,7 +92,6 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.entitiy.TbLogEntityActionService;
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory;
import org.thingsboard.server.service.edge.rpc.constructor.asset.AssetMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.asset.AssetMsgConstructorFactory;
import org.thingsboard.server.service.edge.rpc.constructor.customer.CustomerMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.customer.CustomerMsgConstructorFactory;
@ -115,6 +113,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.widget.WidgetMsgConst
import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessorFactory; import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessorFactory;
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessorFactory; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessorFactory;
import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewProcessorFactory; import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewProcessorFactory;
import org.thingsboard.server.service.entitiy.TbLogEntityActionService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
@ -356,35 +355,15 @@ public abstract class BaseEdgeProcessor {
private boolean doSaveIfEdgeIsOffline(EdgeEventType type, private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
EdgeEventActionType action) { EdgeEventActionType action) {
switch (action) { return switch (action) {
case TIMESERIES_UPDATED: case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, CREDENTIALS_REQUEST, ADDED_COMMENT, UPDATED_COMMENT ->
case ALARM_ACK: true;
case ALARM_CLEAR: default -> switch (type) {
case ALARM_ASSIGNED: case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION ->
case ALARM_UNASSIGNED: true;
case CREDENTIALS_REQUEST: default -> false;
case ADDED_COMMENT: };
case UPDATED_COMMENT: };
return true;
}
switch (type) {
case ALARM:
case ALARM_COMMENT:
case RULE_CHAIN:
case RULE_CHAIN_METADATA:
case USER:
case CUSTOMER:
case TENANT:
case TENANT_PROFILE:
case WIDGETS_BUNDLE:
case WIDGET_TYPE:
case ADMIN_SETTINGS:
case OTA_PACKAGE:
case QUEUE:
case RELATION:
return true;
}
return false;
} }
private ListenableFuture<Void> doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) { private ListenableFuture<Void> doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) {
@ -435,31 +414,17 @@ public abstract class BaseEdgeProcessor {
} }
protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) { protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) {
switch (actionType) { return switch (actionType) {
case UPDATED: case UPDATED, CREDENTIALS_UPDATED, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, UPDATED_COMMENT ->
case CREDENTIALS_UPDATED: UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
case ASSIGNED_TO_CUSTOMER: case ADDED, ASSIGNED_TO_EDGE, RELATION_ADD_OR_UPDATE, ADDED_COMMENT ->
case UNASSIGNED_FROM_CUSTOMER: UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
case UPDATED_COMMENT: case DELETED, UNASSIGNED_FROM_EDGE, RELATION_DELETED, DELETED_COMMENT, ALARM_DELETE ->
return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
case ADDED: case ALARM_ACK -> UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
case ASSIGNED_TO_EDGE: case ALARM_CLEAR -> UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE;
case RELATION_ADD_OR_UPDATE: default -> throw new RuntimeException("Unsupported actionType [" + actionType + "]");
case ADDED_COMMENT: };
return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
case DELETED:
case UNASSIGNED_FROM_EDGE:
case RELATION_DELETED:
case DELETED_COMMENT:
case ALARM_DELETE:
return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
case ALARM_ACK:
return UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
case ALARM_CLEAR:
return UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE;
default:
throw new RuntimeException("Unsupported actionType [" + actionType + "]");
}
} }
public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
@ -554,15 +519,11 @@ public abstract class BaseEdgeProcessor {
} }
private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) { private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
switch (actionType) { return switch (actionType) {
case ADDED: case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity
case UPDATED: processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
case DELETED: default -> Futures.immediateFuture(null);
case CREDENTIALS_UPDATED: // used by USER entity };
return processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
default:
return Futures.immediateFuture(null);
}
} }
protected EntityId constructEntityId(String entityTypeStr, long entityIdMSB, long entityIdLSB) { protected EntityId constructEntityId(String entityTypeStr, long entityIdMSB, long entityIdLSB) {
@ -605,26 +566,18 @@ public abstract class BaseEdgeProcessor {
} }
protected boolean isEntityExists(TenantId tenantId, EntityId entityId) { protected boolean isEntityExists(TenantId tenantId, EntityId entityId) {
switch (entityId.getEntityType()) { return switch (entityId.getEntityType()) {
case TENANT: case TENANT -> tenantService.findTenantById(tenantId) != null;
return tenantService.findTenantById(tenantId) != null; case DEVICE -> deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
case DEVICE: case ASSET -> assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; case ENTITY_VIEW ->
case ASSET: entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null; case CUSTOMER -> customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
case ENTITY_VIEW: case USER -> userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null; case DASHBOARD -> dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
case CUSTOMER: case EDGE -> edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null; default -> false;
case USER: };
return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
case DASHBOARD:
return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
case EDGE:
return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
default:
return false;
}
} }
protected void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) { protected void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
@ -663,37 +616,25 @@ public abstract class BaseEdgeProcessor {
} }
protected AssetProfile checkIfAssetProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, AssetProfile assetProfile, EdgeVersion edgeVersion) { protected AssetProfile checkIfAssetProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, AssetProfile assetProfile, EdgeVersion edgeVersion) {
switch (edgeVersion) { if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) {
case V_3_3_3: if (assetProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) {
case V_3_3_0: assetProfile.setDefaultDashboardId(null);
case V_3_4_0: }
if (assetProfile.getDefaultDashboardId() != null if (assetProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) {
&& isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) { assetProfile.setDefaultEdgeRuleChainId(null);
assetProfile.setDefaultDashboardId(null); }
}
if (assetProfile.getDefaultEdgeRuleChainId() != null
&& isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) {
assetProfile.setDefaultEdgeRuleChainId(null);
}
break;
} }
return assetProfile; return assetProfile;
} }
protected DeviceProfile checkIfDeviceProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, DeviceProfile deviceProfile, EdgeVersion edgeVersion) { protected DeviceProfile checkIfDeviceProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, DeviceProfile deviceProfile, EdgeVersion edgeVersion) {
switch (edgeVersion) { if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) {
case V_3_3_3: if (deviceProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) {
case V_3_3_0: deviceProfile.setDefaultDashboardId(null);
case V_3_4_0: }
if (deviceProfile.getDefaultDashboardId() != null if (deviceProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) {
&& isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) { deviceProfile.setDefaultEdgeRuleChainId(null);
deviceProfile.setDefaultDashboardId(null); }
}
if (deviceProfile.getDefaultEdgeRuleChainId() != null
&& isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) {
deviceProfile.setDefaultEdgeRuleChainId(null);
}
break;
} }
return deviceProfile; return deviceProfile;
} }
@ -708,4 +649,5 @@ public abstract class BaseEdgeProcessor {
} }
return true; return true;
} }
} }

View File

@ -16,12 +16,18 @@
package org.thingsboard.server.service.edge.rpc.processor.telemetry; package org.thingsboard.server.service.edge.rpc.processor.telemetry;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
@ -31,18 +37,28 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
@TbCoreComponent @TbCoreComponent
public class TelemetryEdgeProcessor extends BaseTelemetryProcessor { public class TelemetryEdgeProcessor extends BaseTelemetryProcessor {
@Value("${edges.rpc.max_telemetry_message_size:0}")
private int maxTelemetryMessageSize;
@Lazy
@Autowired
private NotificationRuleProcessor notificationRuleProcessor;
@Override @Override
protected String getMsgSourceKey() { protected String getMsgSourceKey() {
return DataConstants.EDGE_MSG_SOURCE; return DataConstants.EDGE_MSG_SOURCE;
} }
public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) { public DownlinkMsg convertTelemetryEventToDownlink(Edge edge, EdgeEvent edgeEvent) {
if (edgeEvent.getBody() != null) { if (edgeEvent.getBody() != null) {
String bodyStr = edgeEvent.getBody().toString(); String bodyStr = edgeEvent.getBody().toString();
if (bodyStr.length() > 1000) { if (maxTelemetryMessageSize > 0 && bodyStr.length() > maxTelemetryMessageSize) {
log.debug("[{}][{}][{}] Conversion to a DownlinkMsg telemetry event failed due to a size limit violation. " + String error = "Conversion to a DownlinkMsg telemetry event failed due to a size limit violation.";
"Current size is {}, but the limit is 1000. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(), String message = String.format("%s Current size is %s, but the limit is %s", error, bodyStr.length(), maxTelemetryMessageSize);
edgeEvent.getEntityId(), bodyStr.length(), StringUtils.truncate(bodyStr, 100)); log.debug("[{}][{}][{}] {}. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(),
edgeEvent.getEntityId(), message, StringUtils.truncate(bodyStr, 100));
notificationRuleProcessor.process(EdgeCommunicationFailureTrigger.builder().tenantId(edgeEvent.getTenantId())
.edgeId(edgeEvent.getEdgeId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(message).error(error).build());
return null; return null;
} }
} }
@ -55,4 +71,5 @@ public class TelemetryEdgeProcessor extends BaseTelemetryProcessor {
.addEntityData(entityDataProto) .addEntityData(entityDataProto)
.build(); .build();
} }
} }

View File

@ -60,4 +60,5 @@ public class TenantEdgeProcessor extends BaseEdgeProcessor {
} }
return downlinkMsg; return downlinkMsg;
} }
} }

View File

@ -1304,6 +1304,8 @@ edges:
private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}" private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
# Maximum size (in bytes) of inbound messages the cloud can handle from the edge. By default, it can handle messages up to 4 Megabytes # Maximum size (in bytes) of inbound messages the cloud can handle from the edge. By default, it can handle messages up to 4 Megabytes
max_inbound_message_size: "${EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE:4194304}" max_inbound_message_size: "${EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE:4194304}"
# Maximum length of telemetry (time-series and attributes) message the cloud sends to the edge. By default, there is no limitation.
max_telemetry_message_size: "${EDGES_RPC_MAX_TELEMETRY_MESSAGE_SIZE:0}"
storage: storage:
# Max records of edge event to read from DB and sent to the edge # Max records of edge event to read from DB and sent to the edge
max_read_records_count: "${EDGES_STORAGE_MAX_READ_RECORDS_COUNT:50}" max_read_records_count: "${EDGES_STORAGE_MAX_READ_RECORDS_COUNT:50}"

View File

@ -526,17 +526,18 @@ public abstract class BaseEdgeProcessorTest {
} }
protected static Stream<Arguments> provideParameters() { protected static Stream<Arguments> provideParameters() {
UUID dashoboardUUID = UUID.randomUUID(); UUID dashboardUUID = UUID.randomUUID();
UUID ruleChaindUUID = UUID.randomUUID(); UUID ruleChainUUID = UUID.randomUUID();
return Stream.of( return Stream.of(
Arguments.of(EdgeVersion.V_3_3_0, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_3_0, 0, 0, 0, 0),
Arguments.of(EdgeVersion.V_3_3_3, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_3_3, 0, 0, 0, 0),
Arguments.of(EdgeVersion.V_3_4_0, 0, 0, 0, 0), Arguments.of(EdgeVersion.V_3_4_0, 0, 0, 0, 0),
Arguments.of(EdgeVersion.V_3_6_0, Arguments.of(EdgeVersion.V_3_6_0,
dashoboardUUID.getMostSignificantBits(), dashboardUUID.getMostSignificantBits(),
dashoboardUUID.getLeastSignificantBits(), dashboardUUID.getLeastSignificantBits(),
ruleChaindUUID.getMostSignificantBits(), ruleChainUUID.getMostSignificantBits(),
ruleChaindUUID.getLeastSignificantBits()) ruleChainUUID.getLeastSignificantBits())
); );
} }
} }

View File

@ -16,27 +16,51 @@
package org.thingsboard.server.service.edge.rpc.processor.telemetry; package org.thingsboard.server.service.edge.rpc.processor.telemetry;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessorTest;
@Slf4j import static org.mockito.ArgumentMatchers.any;
@RunWith(MockitoJUnitRunner.class) import static org.mockito.Mockito.verify;
public class TelemetryEdgeProcessorTest {
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {TelemetryEdgeProcessor.class})
@TestPropertySource(properties = {
"edges.rpc.max_telemetry_message_size=1000"
})
public class TelemetryEdgeProcessorTest extends BaseEdgeProcessorTest {
@SpyBean
private TelemetryEdgeProcessor telemetryEdgeProcessor;
@MockBean
private NotificationRuleProcessor notificationRuleProcessor;
@Test @Test
public void testConvert_maxSizeLimit() throws Exception { public void testConvert_maxSizeLimit() {
Edge edge = new Edge();
EdgeEvent edgeEvent = new EdgeEvent(); EdgeEvent edgeEvent = new EdgeEvent();
ObjectNode body = JacksonUtil.newObjectNode(); ObjectNode body = JacksonUtil.newObjectNode();
body.put("value", StringUtils.randomAlphanumeric(10000)); body.put("value", StringUtils.randomAlphanumeric(1000));
edgeEvent.setBody(body); edgeEvent.setBody(body);
DownlinkMsg downlinkMsg = new TelemetryEdgeProcessor().convertTelemetryEventToDownlink(edgeEvent);
DownlinkMsg downlinkMsg = telemetryEdgeProcessor.convertTelemetryEventToDownlink(edge, edgeEvent);
Assert.assertNull(downlinkMsg); Assert.assertNull(downlinkMsg);
verify(notificationRuleProcessor, Mockito.times(1)).process(any());
} }
} }

View File

@ -434,36 +434,28 @@ public class ProtoUtils {
} }
public static TransportProtos.ToDeviceActorNotificationMsgProto toProto(ToDeviceActorNotificationMsg msg) { public static TransportProtos.ToDeviceActorNotificationMsgProto toProto(ToDeviceActorNotificationMsg msg) {
if (msg instanceof DeviceEdgeUpdateMsg) { if (msg instanceof DeviceEdgeUpdateMsg updateMsg) {
DeviceEdgeUpdateMsg updateMsg = (DeviceEdgeUpdateMsg) msg;
TransportProtos.DeviceEdgeUpdateMsgProto proto = toProto(updateMsg); TransportProtos.DeviceEdgeUpdateMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(proto).build();
} else if (msg instanceof DeviceNameOrTypeUpdateMsg) { } else if (msg instanceof DeviceNameOrTypeUpdateMsg updateMsg) {
DeviceNameOrTypeUpdateMsg updateMsg = (DeviceNameOrTypeUpdateMsg) msg;
TransportProtos.DeviceNameOrTypeUpdateMsgProto proto = toProto(updateMsg); TransportProtos.DeviceNameOrTypeUpdateMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(proto).build();
} else if (msg instanceof DeviceAttributesEventNotificationMsg) { } else if (msg instanceof DeviceAttributesEventNotificationMsg updateMsg) {
DeviceAttributesEventNotificationMsg updateMsg = (DeviceAttributesEventNotificationMsg) msg;
TransportProtos.DeviceAttributesEventMsgProto proto = toProto(updateMsg); TransportProtos.DeviceAttributesEventMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(proto).build();
} else if (msg instanceof DeviceCredentialsUpdateNotificationMsg) { } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg updateMsg) {
DeviceCredentialsUpdateNotificationMsg updateMsg = (DeviceCredentialsUpdateNotificationMsg) msg;
TransportProtos.DeviceCredentialsUpdateMsgProto proto = toProto(updateMsg); TransportProtos.DeviceCredentialsUpdateMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(proto).build();
} else if (msg instanceof ToDeviceRpcRequestActorMsg) { } else if (msg instanceof ToDeviceRpcRequestActorMsg updateMsg) {
ToDeviceRpcRequestActorMsg updateMsg = (ToDeviceRpcRequestActorMsg) msg;
TransportProtos.ToDeviceRpcRequestActorMsgProto proto = toProto(updateMsg); TransportProtos.ToDeviceRpcRequestActorMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(proto).build();
} else if (msg instanceof FromDeviceRpcResponseActorMsg) { } else if (msg instanceof FromDeviceRpcResponseActorMsg updateMsg) {
FromDeviceRpcResponseActorMsg updateMsg = (FromDeviceRpcResponseActorMsg) msg;
TransportProtos.FromDeviceRpcResponseActorMsgProto proto = toProto(updateMsg); TransportProtos.FromDeviceRpcResponseActorMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(proto).build();
} else if (msg instanceof RemoveRpcActorMsg) { } else if (msg instanceof RemoveRpcActorMsg updateMsg) {
RemoveRpcActorMsg updateMsg = (RemoveRpcActorMsg) msg;
TransportProtos.RemoveRpcActorMsgProto proto = toProto(updateMsg); TransportProtos.RemoveRpcActorMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(proto).build();
} else if (msg instanceof DeviceDeleteMsg) { } else if (msg instanceof DeviceDeleteMsg updateMsg) {
DeviceDeleteMsg updateMsg = (DeviceDeleteMsg) msg;
TransportProtos.DeviceDeleteMsgProto proto = toProto(updateMsg); TransportProtos.DeviceDeleteMsgProto proto = toProto(updateMsg);
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceDeleteMsg(proto).build(); return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceDeleteMsg(proto).build();
} }
@ -507,24 +499,14 @@ public class ProtoUtils {
List<AttributeKvEntry> result = new ArrayList<>(); List<AttributeKvEntry> result = new ArrayList<>();
for (TransportProtos.AttributeValueProto kvEntry : valuesList) { for (TransportProtos.AttributeValueProto kvEntry : valuesList) {
boolean hasValue = kvEntry.getHasV(); boolean hasValue = kvEntry.getHasV();
KvEntry entry = null; KvEntry entry = switch (kvEntry.getType()) {
switch (kvEntry.getType()) { case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null);
case BOOLEAN_V: case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null);
entry = new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null); case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
break; case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
case LONG_V: case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
entry = new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null); default -> null;
break; };
case DOUBLE_V:
entry = new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
break;
case STRING_V:
entry = new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
break;
case JSON_V:
entry = new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
break;
}
result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry)); result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry));
} }
return result; return result;
@ -1029,15 +1011,11 @@ public class ProtoUtils {
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()) .setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits())
.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo())); .setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()));
PowerSavingConfiguration psmConfiguration = null; PowerSavingConfiguration psmConfiguration = switch (device.getDeviceData().getTransportConfiguration().getType()) {
switch (device.getDeviceData().getTransportConfiguration().getType()) { case LWM2M -> (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
case LWM2M: case COAP -> (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
psmConfiguration = (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration(); default -> null;
break; };
case COAP:
psmConfiguration = (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
break;
}
if (psmConfiguration != null) { if (psmConfiguration != null) {
PowerMode powerMode = psmConfiguration.getPowerMode(); PowerMode powerMode = psmConfiguration.getPowerMode();
@ -1079,4 +1057,5 @@ public class ProtoUtils {
private static Long checkLong(Long l) { private static Long checkLong(Long l) {
return isNotNull(l) ? l : 0; return isNotNull(l) ? l : 0;
} }
} }