Add yaml parameter to set max size for telemetry message
This commit is contained in:
parent
ec36558f42
commit
0107bc29ad
@ -498,9 +498,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", this.tenantId, this.sessionId, copy.size());
|
||||
for (DownlinkMsg downlinkMsg : copy) {
|
||||
if (this.clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > this.clientMaxInboundMessageSize) {
|
||||
String error = String.format("Client max inbound message size [{%s}] is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " +
|
||||
String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " +
|
||||
"env variable on the edge and restart it.", this.clientMaxInboundMessageSize);
|
||||
String message = String.format("Downlink msg size [{%s}] exceeds client max inbound message size [{%s}]. " +
|
||||
String message = String.format("Downlink msg size %s exceeds client max inbound message size %s. " +
|
||||
"Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the edge and restart it.", downlinkMsg.getSerializedSize(), this.clientMaxInboundMessageSize);
|
||||
log.error("[{}][{}][{}] {} Message {}", this.tenantId, edge.getId(), this.sessionId, message, downlinkMsg);
|
||||
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId)
|
||||
@ -551,34 +551,13 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
DownlinkMsg downlinkMsg = null;
|
||||
try {
|
||||
switch (edgeEvent.getAction()) {
|
||||
case UPDATED:
|
||||
case ADDED:
|
||||
case DELETED:
|
||||
case ASSIGNED_TO_EDGE:
|
||||
case UNASSIGNED_FROM_EDGE:
|
||||
case ALARM_ACK:
|
||||
case ALARM_CLEAR:
|
||||
case ALARM_DELETE:
|
||||
case CREDENTIALS_UPDATED:
|
||||
case RELATION_ADD_OR_UPDATE:
|
||||
case RELATION_DELETED:
|
||||
case CREDENTIALS_REQUEST:
|
||||
case RPC_CALL:
|
||||
case ASSIGNED_TO_CUSTOMER:
|
||||
case UNASSIGNED_FROM_CUSTOMER:
|
||||
case ADDED_COMMENT:
|
||||
case UPDATED_COMMENT:
|
||||
case DELETED_COMMENT:
|
||||
case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, CREDENTIALS_REQUEST, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
|
||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||
log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
||||
break;
|
||||
case ATTRIBUTES_UPDATED:
|
||||
case POST_ATTRIBUTES:
|
||||
case ATTRIBUTES_DELETED:
|
||||
case TIMESERIES_UPDATED:
|
||||
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
|
||||
break;
|
||||
default:
|
||||
}
|
||||
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
|
||||
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
|
||||
default ->
|
||||
log.warn("[{}][{}] Unsupported action type [{}]", this.tenantId, this.sessionId, edgeEvent.getAction());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -857,7 +836,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
.build();
|
||||
}
|
||||
String error = "Failed to validate the edge!";
|
||||
String failureMsg = String.format("{%s} Provided request secret: %s", error, request.getEdgeSecret());
|
||||
String failureMsg = String.format("%s Provided request secret: %s", error, request.getEdgeSecret());
|
||||
ctx.getNotificationRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId())
|
||||
.customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build());
|
||||
return ConnectResponseMsg.newBuilder()
|
||||
|
||||
@ -25,7 +25,6 @@ import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
import org.thingsboard.server.common.data.Dashboard;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
@ -93,7 +92,6 @@ import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.service.entitiy.TbLogEntityActionService;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.asset.AssetMsgConstructorFactory;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.customer.CustomerMsgConstructorFactory;
|
||||
@ -115,6 +113,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.widget.WidgetMsgConst
|
||||
import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessorFactory;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessorFactory;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewProcessorFactory;
|
||||
import org.thingsboard.server.service.entitiy.TbLogEntityActionService;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
@ -356,35 +355,15 @@ public abstract class BaseEdgeProcessor {
|
||||
|
||||
private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
|
||||
EdgeEventActionType action) {
|
||||
switch (action) {
|
||||
case TIMESERIES_UPDATED:
|
||||
case ALARM_ACK:
|
||||
case ALARM_CLEAR:
|
||||
case ALARM_ASSIGNED:
|
||||
case ALARM_UNASSIGNED:
|
||||
case CREDENTIALS_REQUEST:
|
||||
case ADDED_COMMENT:
|
||||
case UPDATED_COMMENT:
|
||||
return true;
|
||||
}
|
||||
switch (type) {
|
||||
case ALARM:
|
||||
case ALARM_COMMENT:
|
||||
case RULE_CHAIN:
|
||||
case RULE_CHAIN_METADATA:
|
||||
case USER:
|
||||
case CUSTOMER:
|
||||
case TENANT:
|
||||
case TENANT_PROFILE:
|
||||
case WIDGETS_BUNDLE:
|
||||
case WIDGET_TYPE:
|
||||
case ADMIN_SETTINGS:
|
||||
case OTA_PACKAGE:
|
||||
case QUEUE:
|
||||
case RELATION:
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return switch (action) {
|
||||
case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, CREDENTIALS_REQUEST, ADDED_COMMENT, UPDATED_COMMENT ->
|
||||
true;
|
||||
default -> switch (type) {
|
||||
case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE, ADMIN_SETTINGS, OTA_PACKAGE, QUEUE, RELATION ->
|
||||
true;
|
||||
default -> false;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> doSaveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, EntityId entityId, JsonNode body) {
|
||||
@ -435,31 +414,17 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
protected UpdateMsgType getUpdateMsgType(EdgeEventActionType actionType) {
|
||||
switch (actionType) {
|
||||
case UPDATED:
|
||||
case CREDENTIALS_UPDATED:
|
||||
case ASSIGNED_TO_CUSTOMER:
|
||||
case UNASSIGNED_FROM_CUSTOMER:
|
||||
case UPDATED_COMMENT:
|
||||
return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
|
||||
case ADDED:
|
||||
case ASSIGNED_TO_EDGE:
|
||||
case RELATION_ADD_OR_UPDATE:
|
||||
case ADDED_COMMENT:
|
||||
return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
|
||||
case DELETED:
|
||||
case UNASSIGNED_FROM_EDGE:
|
||||
case RELATION_DELETED:
|
||||
case DELETED_COMMENT:
|
||||
case ALARM_DELETE:
|
||||
return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
|
||||
case ALARM_ACK:
|
||||
return UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
|
||||
case ALARM_CLEAR:
|
||||
return UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE;
|
||||
default:
|
||||
throw new RuntimeException("Unsupported actionType [" + actionType + "]");
|
||||
}
|
||||
return switch (actionType) {
|
||||
case UPDATED, CREDENTIALS_UPDATED, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, UPDATED_COMMENT ->
|
||||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE;
|
||||
case ADDED, ASSIGNED_TO_EDGE, RELATION_ADD_OR_UPDATE, ADDED_COMMENT ->
|
||||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE;
|
||||
case DELETED, UNASSIGNED_FROM_EDGE, RELATION_DELETED, DELETED_COMMENT, ALARM_DELETE ->
|
||||
UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE;
|
||||
case ALARM_ACK -> UpdateMsgType.ALARM_ACK_RPC_MESSAGE;
|
||||
case ALARM_CLEAR -> UpdateMsgType.ALARM_CLEAR_RPC_MESSAGE;
|
||||
default -> throw new RuntimeException("Unsupported actionType [" + actionType + "]");
|
||||
};
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processEntityNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
@ -554,15 +519,11 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processEntityNotificationForAllEdges(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId, EdgeId sourceEdgeId) {
|
||||
switch (actionType) {
|
||||
case ADDED:
|
||||
case UPDATED:
|
||||
case DELETED:
|
||||
case CREDENTIALS_UPDATED: // used by USER entity
|
||||
return processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
|
||||
default:
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
return switch (actionType) {
|
||||
case ADDED, UPDATED, DELETED, CREDENTIALS_UPDATED -> // used by USER entity
|
||||
processActionForAllEdges(tenantId, type, actionType, entityId, null, sourceEdgeId);
|
||||
default -> Futures.immediateFuture(null);
|
||||
};
|
||||
}
|
||||
|
||||
protected EntityId constructEntityId(String entityTypeStr, long entityIdMSB, long entityIdLSB) {
|
||||
@ -605,26 +566,18 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
protected boolean isEntityExists(TenantId tenantId, EntityId entityId) {
|
||||
switch (entityId.getEntityType()) {
|
||||
case TENANT:
|
||||
return tenantService.findTenantById(tenantId) != null;
|
||||
case DEVICE:
|
||||
return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
|
||||
case ASSET:
|
||||
return assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
|
||||
case ENTITY_VIEW:
|
||||
return entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
|
||||
case CUSTOMER:
|
||||
return customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
|
||||
case USER:
|
||||
return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
|
||||
case DASHBOARD:
|
||||
return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
|
||||
case EDGE:
|
||||
return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return switch (entityId.getEntityType()) {
|
||||
case TENANT -> tenantService.findTenantById(tenantId) != null;
|
||||
case DEVICE -> deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
|
||||
case ASSET -> assetService.findAssetById(tenantId, new AssetId(entityId.getId())) != null;
|
||||
case ENTITY_VIEW ->
|
||||
entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())) != null;
|
||||
case CUSTOMER -> customerService.findCustomerById(tenantId, new CustomerId(entityId.getId())) != null;
|
||||
case USER -> userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
|
||||
case DASHBOARD -> dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
|
||||
case EDGE -> edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
protected void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
|
||||
@ -663,37 +616,25 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
protected AssetProfile checkIfAssetProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, AssetProfile assetProfile, EdgeVersion edgeVersion) {
|
||||
switch (edgeVersion) {
|
||||
case V_3_3_3:
|
||||
case V_3_3_0:
|
||||
case V_3_4_0:
|
||||
if (assetProfile.getDefaultDashboardId() != null
|
||||
&& isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) {
|
||||
if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) {
|
||||
if (assetProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultDashboardId(), edgeId)) {
|
||||
assetProfile.setDefaultDashboardId(null);
|
||||
}
|
||||
if (assetProfile.getDefaultEdgeRuleChainId() != null
|
||||
&& isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) {
|
||||
if (assetProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, assetProfile.getDefaultEdgeRuleChainId(), edgeId)) {
|
||||
assetProfile.setDefaultEdgeRuleChainId(null);
|
||||
}
|
||||
break;
|
||||
}
|
||||
return assetProfile;
|
||||
}
|
||||
|
||||
protected DeviceProfile checkIfDeviceProfileDefaultFieldsAssignedToEdge(TenantId tenantId, EdgeId edgeId, DeviceProfile deviceProfile, EdgeVersion edgeVersion) {
|
||||
switch (edgeVersion) {
|
||||
case V_3_3_3:
|
||||
case V_3_3_0:
|
||||
case V_3_4_0:
|
||||
if (deviceProfile.getDefaultDashboardId() != null
|
||||
&& isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) {
|
||||
if (EdgeVersion.V_3_3_0.equals(edgeVersion) || EdgeVersion.V_3_3_3.equals(edgeVersion) || EdgeVersion.V_3_4_0.equals(edgeVersion)) {
|
||||
if (deviceProfile.getDefaultDashboardId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultDashboardId(), edgeId)) {
|
||||
deviceProfile.setDefaultDashboardId(null);
|
||||
}
|
||||
if (deviceProfile.getDefaultEdgeRuleChainId() != null
|
||||
&& isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) {
|
||||
if (deviceProfile.getDefaultEdgeRuleChainId() != null && isEntityNotAssignedToEdge(tenantId, deviceProfile.getDefaultEdgeRuleChainId(), edgeId)) {
|
||||
deviceProfile.setDefaultEdgeRuleChainId(null);
|
||||
}
|
||||
break;
|
||||
}
|
||||
return deviceProfile;
|
||||
}
|
||||
@ -708,4 +649,5 @@ public abstract class BaseEdgeProcessor {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,12 +16,18 @@
|
||||
package org.thingsboard.server.service.edge.rpc.processor.telemetry;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
@ -31,18 +37,28 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
@TbCoreComponent
|
||||
public class TelemetryEdgeProcessor extends BaseTelemetryProcessor {
|
||||
|
||||
@Value("${edges.rpc.max_telemetry_message_size:0}")
|
||||
private int maxTelemetryMessageSize;
|
||||
|
||||
@Lazy
|
||||
@Autowired
|
||||
private NotificationRuleProcessor notificationRuleProcessor;
|
||||
|
||||
@Override
|
||||
protected String getMsgSourceKey() {
|
||||
return DataConstants.EDGE_MSG_SOURCE;
|
||||
}
|
||||
|
||||
public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) {
|
||||
public DownlinkMsg convertTelemetryEventToDownlink(Edge edge, EdgeEvent edgeEvent) {
|
||||
if (edgeEvent.getBody() != null) {
|
||||
String bodyStr = edgeEvent.getBody().toString();
|
||||
if (bodyStr.length() > 1000) {
|
||||
log.debug("[{}][{}][{}] Conversion to a DownlinkMsg telemetry event failed due to a size limit violation. " +
|
||||
"Current size is {}, but the limit is 1000. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(),
|
||||
edgeEvent.getEntityId(), bodyStr.length(), StringUtils.truncate(bodyStr, 100));
|
||||
if (maxTelemetryMessageSize > 0 && bodyStr.length() > maxTelemetryMessageSize) {
|
||||
String error = "Conversion to a DownlinkMsg telemetry event failed due to a size limit violation.";
|
||||
String message = String.format("%s Current size is %s, but the limit is %s", error, bodyStr.length(), maxTelemetryMessageSize);
|
||||
log.debug("[{}][{}][{}] {}. {}", edgeEvent.getTenantId(), edgeEvent.getEdgeId(),
|
||||
edgeEvent.getEntityId(), message, StringUtils.truncate(bodyStr, 100));
|
||||
notificationRuleProcessor.process(EdgeCommunicationFailureTrigger.builder().tenantId(edgeEvent.getTenantId())
|
||||
.edgeId(edgeEvent.getEdgeId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(message).error(error).build());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -55,4 +71,5 @@ public class TelemetryEdgeProcessor extends BaseTelemetryProcessor {
|
||||
.addEntityData(entityDataProto)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -60,4 +60,5 @@ public class TenantEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
return downlinkMsg;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1304,6 +1304,8 @@ edges:
|
||||
private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
|
||||
# Maximum size (in bytes) of inbound messages the cloud can handle from the edge. By default, it can handle messages up to 4 Megabytes
|
||||
max_inbound_message_size: "${EDGES_RPC_MAX_INBOUND_MESSAGE_SIZE:4194304}"
|
||||
# Maximum length of telemetry (time-series and attributes) message the cloud sends to the edge. By default, there is no limitation.
|
||||
max_telemetry_message_size: "${EDGES_RPC_MAX_TELEMETRY_MESSAGE_SIZE:300}"
|
||||
storage:
|
||||
# Max records of edge event to read from DB and sent to the edge
|
||||
max_read_records_count: "${EDGES_STORAGE_MAX_READ_RECORDS_COUNT:50}"
|
||||
|
||||
@ -526,17 +526,18 @@ public abstract class BaseEdgeProcessorTest {
|
||||
}
|
||||
|
||||
protected static Stream<Arguments> provideParameters() {
|
||||
UUID dashoboardUUID = UUID.randomUUID();
|
||||
UUID ruleChaindUUID = UUID.randomUUID();
|
||||
UUID dashboardUUID = UUID.randomUUID();
|
||||
UUID ruleChainUUID = UUID.randomUUID();
|
||||
return Stream.of(
|
||||
Arguments.of(EdgeVersion.V_3_3_0, 0, 0, 0, 0),
|
||||
Arguments.of(EdgeVersion.V_3_3_3, 0, 0, 0, 0),
|
||||
Arguments.of(EdgeVersion.V_3_4_0, 0, 0, 0, 0),
|
||||
Arguments.of(EdgeVersion.V_3_6_0,
|
||||
dashoboardUUID.getMostSignificantBits(),
|
||||
dashoboardUUID.getLeastSignificantBits(),
|
||||
ruleChaindUUID.getMostSignificantBits(),
|
||||
ruleChaindUUID.getLeastSignificantBits())
|
||||
dashboardUUID.getMostSignificantBits(),
|
||||
dashboardUUID.getLeastSignificantBits(),
|
||||
ruleChainUUID.getMostSignificantBits(),
|
||||
ruleChainUUID.getLeastSignificantBits())
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,27 +16,51 @@
|
||||
package org.thingsboard.server.service.edge.rpc.processor.telemetry;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessorTest;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TelemetryEdgeProcessorTest {
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@ContextConfiguration(classes = {TelemetryEdgeProcessor.class})
|
||||
@TestPropertySource(properties = {
|
||||
"edges.rpc.max_telemetry_message_size=1000"
|
||||
})
|
||||
public class TelemetryEdgeProcessorTest extends BaseEdgeProcessorTest {
|
||||
|
||||
@SpyBean
|
||||
private TelemetryEdgeProcessor telemetryEdgeProcessor;
|
||||
|
||||
@MockBean
|
||||
private NotificationRuleProcessor notificationRuleProcessor;
|
||||
|
||||
@Test
|
||||
public void testConvert_maxSizeLimit() throws Exception {
|
||||
public void testConvert_maxSizeLimit() {
|
||||
Edge edge = new Edge();
|
||||
EdgeEvent edgeEvent = new EdgeEvent();
|
||||
ObjectNode body = JacksonUtil.newObjectNode();
|
||||
body.put("value", StringUtils.randomAlphanumeric(10000));
|
||||
body.put("value", StringUtils.randomAlphanumeric(1000));
|
||||
edgeEvent.setBody(body);
|
||||
DownlinkMsg downlinkMsg = new TelemetryEdgeProcessor().convertTelemetryEventToDownlink(edgeEvent);
|
||||
|
||||
DownlinkMsg downlinkMsg = telemetryEdgeProcessor.convertTelemetryEventToDownlink(edge, edgeEvent);
|
||||
Assert.assertNull(downlinkMsg);
|
||||
|
||||
verify(notificationRuleProcessor, Mockito.times(1)).process(any());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -434,36 +434,28 @@ public class ProtoUtils {
|
||||
}
|
||||
|
||||
public static TransportProtos.ToDeviceActorNotificationMsgProto toProto(ToDeviceActorNotificationMsg msg) {
|
||||
if (msg instanceof DeviceEdgeUpdateMsg) {
|
||||
DeviceEdgeUpdateMsg updateMsg = (DeviceEdgeUpdateMsg) msg;
|
||||
if (msg instanceof DeviceEdgeUpdateMsg updateMsg) {
|
||||
TransportProtos.DeviceEdgeUpdateMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(proto).build();
|
||||
} else if (msg instanceof DeviceNameOrTypeUpdateMsg) {
|
||||
DeviceNameOrTypeUpdateMsg updateMsg = (DeviceNameOrTypeUpdateMsg) msg;
|
||||
} else if (msg instanceof DeviceNameOrTypeUpdateMsg updateMsg) {
|
||||
TransportProtos.DeviceNameOrTypeUpdateMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(proto).build();
|
||||
} else if (msg instanceof DeviceAttributesEventNotificationMsg) {
|
||||
DeviceAttributesEventNotificationMsg updateMsg = (DeviceAttributesEventNotificationMsg) msg;
|
||||
} else if (msg instanceof DeviceAttributesEventNotificationMsg updateMsg) {
|
||||
TransportProtos.DeviceAttributesEventMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(proto).build();
|
||||
} else if (msg instanceof DeviceCredentialsUpdateNotificationMsg) {
|
||||
DeviceCredentialsUpdateNotificationMsg updateMsg = (DeviceCredentialsUpdateNotificationMsg) msg;
|
||||
} else if (msg instanceof DeviceCredentialsUpdateNotificationMsg updateMsg) {
|
||||
TransportProtos.DeviceCredentialsUpdateMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(proto).build();
|
||||
} else if (msg instanceof ToDeviceRpcRequestActorMsg) {
|
||||
ToDeviceRpcRequestActorMsg updateMsg = (ToDeviceRpcRequestActorMsg) msg;
|
||||
} else if (msg instanceof ToDeviceRpcRequestActorMsg updateMsg) {
|
||||
TransportProtos.ToDeviceRpcRequestActorMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(proto).build();
|
||||
} else if (msg instanceof FromDeviceRpcResponseActorMsg) {
|
||||
FromDeviceRpcResponseActorMsg updateMsg = (FromDeviceRpcResponseActorMsg) msg;
|
||||
} else if (msg instanceof FromDeviceRpcResponseActorMsg updateMsg) {
|
||||
TransportProtos.FromDeviceRpcResponseActorMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(proto).build();
|
||||
} else if (msg instanceof RemoveRpcActorMsg) {
|
||||
RemoveRpcActorMsg updateMsg = (RemoveRpcActorMsg) msg;
|
||||
} else if (msg instanceof RemoveRpcActorMsg updateMsg) {
|
||||
TransportProtos.RemoveRpcActorMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(proto).build();
|
||||
} else if (msg instanceof DeviceDeleteMsg) {
|
||||
DeviceDeleteMsg updateMsg = (DeviceDeleteMsg) msg;
|
||||
} else if (msg instanceof DeviceDeleteMsg updateMsg) {
|
||||
TransportProtos.DeviceDeleteMsgProto proto = toProto(updateMsg);
|
||||
return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceDeleteMsg(proto).build();
|
||||
}
|
||||
@ -507,24 +499,14 @@ public class ProtoUtils {
|
||||
List<AttributeKvEntry> result = new ArrayList<>();
|
||||
for (TransportProtos.AttributeValueProto kvEntry : valuesList) {
|
||||
boolean hasValue = kvEntry.getHasV();
|
||||
KvEntry entry = null;
|
||||
switch (kvEntry.getType()) {
|
||||
case BOOLEAN_V:
|
||||
entry = new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null);
|
||||
break;
|
||||
case LONG_V:
|
||||
entry = new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null);
|
||||
break;
|
||||
case DOUBLE_V:
|
||||
entry = new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
|
||||
break;
|
||||
case STRING_V:
|
||||
entry = new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
|
||||
break;
|
||||
case JSON_V:
|
||||
entry = new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
|
||||
break;
|
||||
}
|
||||
KvEntry entry = switch (kvEntry.getType()) {
|
||||
case BOOLEAN_V -> new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null);
|
||||
case LONG_V -> new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null);
|
||||
case DOUBLE_V -> new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null);
|
||||
case STRING_V -> new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null);
|
||||
case JSON_V -> new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null);
|
||||
default -> null;
|
||||
};
|
||||
result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry));
|
||||
}
|
||||
return result;
|
||||
@ -1029,15 +1011,11 @@ public class ProtoUtils {
|
||||
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits())
|
||||
.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()));
|
||||
|
||||
PowerSavingConfiguration psmConfiguration = null;
|
||||
switch (device.getDeviceData().getTransportConfiguration().getType()) {
|
||||
case LWM2M:
|
||||
psmConfiguration = (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
|
||||
break;
|
||||
case COAP:
|
||||
psmConfiguration = (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
|
||||
break;
|
||||
}
|
||||
PowerSavingConfiguration psmConfiguration = switch (device.getDeviceData().getTransportConfiguration().getType()) {
|
||||
case LWM2M -> (Lwm2mDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
|
||||
case COAP -> (CoapDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
|
||||
default -> null;
|
||||
};
|
||||
|
||||
if (psmConfiguration != null) {
|
||||
PowerMode powerMode = psmConfiguration.getPowerMode();
|
||||
@ -1079,4 +1057,5 @@ public class ProtoUtils {
|
||||
private static Long checkLong(Long l) {
|
||||
return isNotNull(l) ? l : 0;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user