Merge pull request #7478 from volodymyr-babak/feature/edge-limit-attempt
[3.4.2] Added functionality to drop messages that are not able to be processed by edge
This commit is contained in:
commit
8c2c007c0f
@ -86,6 +86,8 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
|
||||
private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
|
||||
|
||||
private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected
|
||||
|
||||
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
||||
|
||||
private final UUID sessionId;
|
||||
@ -156,12 +158,13 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
log.error("Failed to deliver message from client!", t);
|
||||
log.error("[{}] Stream was terminated due to error:", sessionId, t);
|
||||
closeSession();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
log.info("[{}] Stream was closed and completed successfully!", sessionId);
|
||||
closeSession();
|
||||
}
|
||||
|
||||
@ -184,6 +187,10 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) {
|
||||
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
|
||||
syncCompleted = false;
|
||||
if (sessionState.getSendDownlinkMsgsFuture() != null && sessionState.getSendDownlinkMsgsFuture().isDone()) {
|
||||
String errorMsg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(new RuntimeException(errorMsg));
|
||||
}
|
||||
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
|
||||
}
|
||||
|
||||
@ -252,9 +259,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
try {
|
||||
if (msg.getSuccess()) {
|
||||
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
|
||||
log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg);
|
||||
log.debug("[{}] Msg has been processed successfully!Msd Id: [{}], Msg: {}", edge.getRoutingKey(), msg.getDownlinkMsgId(), msg);
|
||||
} else {
|
||||
log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg());
|
||||
log.error("[{}] Msg processing failed! Msd Id: [{}], Error msg: {}", edge.getRoutingKey(), msg.getDownlinkMsgId(), msg.getErrorMsg());
|
||||
}
|
||||
if (sessionState.getPendingMsgsMap().isEmpty()) {
|
||||
log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
|
||||
@ -392,17 +399,17 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
|
||||
sessionState.getPendingMsgsMap().clear();
|
||||
downlinkMsgsPack.forEach(msg -> sessionState.getPendingMsgsMap().put(msg.getDownlinkMsgId(), msg));
|
||||
scheduleDownlinkMsgsPackSend(true);
|
||||
scheduleDownlinkMsgsPackSend(1);
|
||||
return sessionState.getSendDownlinkMsgsFuture();
|
||||
}
|
||||
|
||||
private void scheduleDownlinkMsgsPackSend(boolean firstRun) {
|
||||
private void scheduleDownlinkMsgsPackSend(int attempt) {
|
||||
Runnable sendDownlinkMsgsTask = () -> {
|
||||
try {
|
||||
if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) {
|
||||
List<DownlinkMsg> copy = new ArrayList<>(sessionState.getPendingMsgsMap().values());
|
||||
if (!firstRun) {
|
||||
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, copy);
|
||||
if (attempt > 1) {
|
||||
log.warn("[{}] Failed to deliver the batch: {}, attempt: {}", this.sessionId, copy, attempt);
|
||||
}
|
||||
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, copy.size());
|
||||
for (DownlinkMsg downlinkMsg : copy) {
|
||||
@ -410,7 +417,13 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
.setDownlinkMsg(downlinkMsg)
|
||||
.build());
|
||||
}
|
||||
scheduleDownlinkMsgsPackSend(false);
|
||||
if (attempt < MAX_DOWNLINK_ATTEMPTS) {
|
||||
scheduleDownlinkMsgsPackSend(attempt + 1);
|
||||
} else {
|
||||
log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
|
||||
this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
}
|
||||
} else {
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
}
|
||||
@ -419,7 +432,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
};
|
||||
|
||||
if (firstRun) {
|
||||
if (attempt == 1) {
|
||||
sendDownlinkExecutorService.submit(sendDownlinkMsgsTask);
|
||||
} else {
|
||||
sessionState.setScheduledSendDownlinkTask(
|
||||
@ -548,7 +561,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
try {
|
||||
if (uplinkMsg.getEntityDataCount() > 0) {
|
||||
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData));
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
|
||||
|
||||
@ -15,13 +15,16 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.constructor;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonPrimitive;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
@ -62,7 +65,7 @@ public class EntityDataMsgConstructor {
|
||||
JsonObject data = entityData.getAsJsonObject();
|
||||
TransportProtos.PostAttributeMsg attributesUpdatedMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"));
|
||||
builder.setAttributesUpdatedMsg(attributesUpdatedMsg);
|
||||
builder.setPostAttributeScope(data.getAsJsonPrimitive("scope").getAsString());
|
||||
builder.setPostAttributeScope(getScopeOfDefault(data));
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Can't convert to AttributesUpdatedMsg proto, entityData [{}]", entityId, entityData, e);
|
||||
}
|
||||
@ -72,7 +75,7 @@ public class EntityDataMsgConstructor {
|
||||
JsonObject data = entityData.getAsJsonObject();
|
||||
TransportProtos.PostAttributeMsg postAttributesMsg = JsonConverter.convertToAttributesProto(data.getAsJsonObject("kv"));
|
||||
builder.setPostAttributesMsg(postAttributesMsg);
|
||||
builder.setPostAttributeScope(data.getAsJsonPrimitive("scope").getAsString());
|
||||
builder.setPostAttributeScope(getScopeOfDefault(data));
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Can't convert to PostAttributesMsg, entityData [{}]", entityId, entityData, e);
|
||||
}
|
||||
@ -94,4 +97,13 @@ public class EntityDataMsgConstructor {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private String getScopeOfDefault(JsonObject data) {
|
||||
JsonPrimitive scope = data.getAsJsonPrimitive("scope");
|
||||
String result = DataConstants.SERVER_SCOPE;
|
||||
if (scope != null && StringUtils.isNotBlank(scope.getAsString())) {
|
||||
result = scope.getAsString();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -24,15 +24,22 @@ import org.springframework.context.annotation.Lazy;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DashboardId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
@ -267,17 +274,21 @@ public abstract class BaseEdgeProcessor {
|
||||
do {
|
||||
tenantsIds = tenantService.findTenantsIds(pageLink);
|
||||
for (TenantId tenantId1 : tenantsIds.getData()) {
|
||||
futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId));
|
||||
futures.addAll(processActionForAllEdgesByTenantId(tenantId1, type, actionType, entityId, null));
|
||||
}
|
||||
pageLink = pageLink.nextPageLink();
|
||||
} while (tenantsIds.hasNext());
|
||||
} else {
|
||||
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId);
|
||||
futures = processActionForAllEdgesByTenantId(tenantId, type, actionType, entityId, null);
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
private List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId, EdgeEventType type, EdgeEventActionType actionType, EntityId entityId) {
|
||||
protected List<ListenableFuture<Void>> processActionForAllEdgesByTenantId(TenantId tenantId,
|
||||
EdgeEventType type,
|
||||
EdgeEventActionType actionType,
|
||||
EntityId entityId,
|
||||
JsonNode body) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<Edge> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
@ -285,7 +296,7 @@ public abstract class BaseEdgeProcessor {
|
||||
pageData = edgeService.findEdgesByTenantId(tenantId, pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (Edge edge : pageData.getData()) {
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, null));
|
||||
futures.add(saveEdgeEvent(tenantId, edge.getId(), type, actionType, entityId, body));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
@ -425,4 +436,30 @@ public abstract class BaseEdgeProcessor {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
}
|
||||
|
||||
protected EntityId constructEntityId(String entityTypeStr, long entityIdMSB, long entityIdLSB) {
|
||||
EntityType entityType = EntityType.valueOf(entityTypeStr);
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
return new DeviceId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case ASSET:
|
||||
return new AssetId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case ENTITY_VIEW:
|
||||
return new EntityViewId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case DASHBOARD:
|
||||
return new DashboardId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case TENANT:
|
||||
return TenantId.fromUUID(new UUID(entityIdMSB, entityIdLSB));
|
||||
case CUSTOMER:
|
||||
return new CustomerId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case USER:
|
||||
return new UserId(new UUID(entityIdMSB, entityIdLSB));
|
||||
case EDGE:
|
||||
return new EdgeId(new UUID(entityIdMSB, entityIdLSB));
|
||||
default:
|
||||
log.warn("Unsupported entity type [{}] during construct of entity id. entityIdMSB [{}], entityIdLSB [{}]",
|
||||
entityTypeStr, entityIdMSB, entityIdLSB);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.gson.Gson;
|
||||
@ -36,8 +35,8 @@ import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.asset.AssetProfile;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
@ -49,7 +48,6 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityViewId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKey;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -59,7 +57,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||
import org.thingsboard.server.controller.BaseController;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
@ -76,7 +74,6 @@ import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -92,19 +89,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
|
||||
}
|
||||
|
||||
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
|
||||
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
|
||||
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) {
|
||||
log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData);
|
||||
List<ListenableFuture<Void>> result = new ArrayList<>();
|
||||
EntityId entityId = constructEntityId(entityData);
|
||||
EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB());
|
||||
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
|
||||
TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
|
||||
Pair<TbMsgMetaData, CustomerId> pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId);
|
||||
TbMsgMetaData metaData = pair.getKey();
|
||||
CustomerId customerId = pair.getValue();
|
||||
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
|
||||
if (entityData.hasPostAttributesMsg()) {
|
||||
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
|
||||
}
|
||||
if (entityData.hasAttributesUpdatedMsg()) {
|
||||
metaData.putValue("scope", entityData.getPostAttributeScope());
|
||||
result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
|
||||
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
|
||||
}
|
||||
if (entityData.hasPostTelemetryMsg()) {
|
||||
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
|
||||
@ -112,12 +111,16 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
|
||||
DeviceId deviceId = new DeviceId(entityId.getId());
|
||||
|
||||
long currentTs = System.currentTimeMillis();
|
||||
|
||||
TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
|
||||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
|
||||
.setLastActivityTime(System.currentTimeMillis()).build();
|
||||
.setLastActivityTime(currentTs).build();
|
||||
|
||||
log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs);
|
||||
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
|
||||
@ -130,12 +133,14 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return result;
|
||||
}
|
||||
|
||||
private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) {
|
||||
private Pair<TbMsgMetaData, CustomerId> getBaseMsgMetadataAndCustomerId(TenantId tenantId, EntityId entityId) {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
CustomerId customerId = null;
|
||||
switch (entityId.getEntityType()) {
|
||||
case DEVICE:
|
||||
Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId()));
|
||||
if (device != null) {
|
||||
customerId = device.getCustomerId();
|
||||
metaData.putValue("deviceName", device.getName());
|
||||
metaData.putValue("deviceType", device.getType());
|
||||
}
|
||||
@ -143,6 +148,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
case ASSET:
|
||||
Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId()));
|
||||
if (asset != null) {
|
||||
customerId = asset.getCustomerId();
|
||||
metaData.putValue("assetName", asset.getName());
|
||||
metaData.putValue("assetType", asset.getType());
|
||||
}
|
||||
@ -150,38 +156,24 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
case ENTITY_VIEW:
|
||||
EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId()));
|
||||
if (entityView != null) {
|
||||
customerId = entityView.getCustomerId();
|
||||
metaData.putValue("entityViewName", entityView.getName());
|
||||
metaData.putValue("entityViewType", entityView.getType());
|
||||
}
|
||||
break;
|
||||
case EDGE:
|
||||
Edge edge = edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId()));
|
||||
if (edge != null) {
|
||||
customerId = edge.getCustomerId();
|
||||
metaData.putValue("edgeName", edge.getName());
|
||||
metaData.putValue("edgeType", edge.getType());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.debug("Using empty metadata for entityId [{}]", entityId);
|
||||
break;
|
||||
}
|
||||
return metaData;
|
||||
}
|
||||
|
||||
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
|
||||
RuleChainId ruleChainId = null;
|
||||
String queueName = null;
|
||||
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
|
||||
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
|
||||
if (deviceProfile == null) {
|
||||
log.warn("[{}] Device profile is null!", entityId);
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
} else if (EntityType.ASSET.equals(entityId.getEntityType())) {
|
||||
AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId()));
|
||||
if (assetProfile == null) {
|
||||
log.warn("[{}] Asset profile is null!", entityId);
|
||||
} else {
|
||||
ruleChainId = assetProfile.getDefaultRuleChainId();
|
||||
queueName = assetProfile.getDefaultQueueName();
|
||||
}
|
||||
}
|
||||
return new ImmutablePair<>(queueName, ruleChainId);
|
||||
return new ImmutablePair<>(metaData, customerId != null ? customerId : new CustomerId(ModelConstants.NULL_UUID));
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
|
||||
@ -207,6 +199,29 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
|
||||
RuleChainId ruleChainId = null;
|
||||
String queueName = null;
|
||||
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
|
||||
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
|
||||
if (deviceProfile == null) {
|
||||
log.warn("[{}] Device profile is null!", entityId);
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
} else if (EntityType.ASSET.equals(entityId.getEntityType())) {
|
||||
AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId()));
|
||||
if (assetProfile == null) {
|
||||
log.warn("[{}] Asset profile is null!", entityId);
|
||||
} else {
|
||||
ruleChainId = assetProfile.getDefaultRuleChainId();
|
||||
queueName = assetProfile.getDefaultQueueName();
|
||||
}
|
||||
}
|
||||
return new ImmutablePair<>(queueName, ruleChainId);
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||
@ -228,6 +243,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId,
|
||||
CustomerId customerId,
|
||||
EntityId entityId,
|
||||
TransportProtos.PostAttributeMsg msg,
|
||||
TbMsgMetaData metaData) {
|
||||
@ -238,26 +254,34 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void tmp) {
|
||||
logAttributesUpdated(tenantId, entityId, scope, attributes, null);
|
||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId,
|
||||
customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
|
||||
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
futureToSet.set(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Can't process attributes update [{}]", msg, t);
|
||||
logAttributesUpdated(tenantId, entityId, scope, attributes, t);
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.error("Can't process attributes update [{}]", msg, t);
|
||||
futureToSet.setException(t);
|
||||
}
|
||||
});
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) {
|
||||
notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null,
|
||||
BaseController.toException(e), scope, attributes);
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
|
||||
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg,
|
||||
String entityType) {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
String scope = attributeDeleteMsg.getScope();
|
||||
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
|
||||
@ -284,29 +308,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
private EntityId constructEntityId(EntityDataProto entityData) {
|
||||
EntityType entityType = EntityType.valueOf(entityData.getEntityType());
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
return new DeviceId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case ASSET:
|
||||
return new AssetId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case ENTITY_VIEW:
|
||||
return new EntityViewId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case DASHBOARD:
|
||||
return new DashboardId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case TENANT:
|
||||
return TenantId.fromUUID(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case CUSTOMER:
|
||||
return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
case USER:
|
||||
return new UserId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
|
||||
default:
|
||||
log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException {
|
||||
EntityId entityId;
|
||||
switch (edgeEvent.getType()) {
|
||||
|
||||
@ -178,14 +178,14 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
|
||||
|
||||
@After
|
||||
public void afterTest() throws Exception {
|
||||
try {
|
||||
edgeImitator.disconnect();
|
||||
} catch (Exception ignored) {}
|
||||
|
||||
loginSysAdmin();
|
||||
|
||||
doDelete("/api/tenant/" + savedTenant.getUuidId())
|
||||
.andExpect(status().isOk());
|
||||
|
||||
try {
|
||||
edgeImitator.disconnect();
|
||||
} catch (Exception ignored) {}
|
||||
}
|
||||
|
||||
private void installation() {
|
||||
|
||||
@ -621,5 +621,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
||||
|
||||
Assert.assertEquals(JacksonUtil.OBJECT_MAPPER.createObjectNode().put(attrKey, attrValue),
|
||||
JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes()));
|
||||
|
||||
client.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,13 +18,13 @@ package org.thingsboard.server.edge;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.protobuf.AbstractMessage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
|
||||
@ -36,12 +36,12 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
|
||||
public void testTimeseriesWithFailures() throws Exception {
|
||||
int numberOfTimeseriesToSend = 1000;
|
||||
|
||||
Device device = findDeviceByName("Edge Device 1");
|
||||
|
||||
edgeImitator.setRandomFailuresOnTimeseriesDownlink(true);
|
||||
// imitator will generate failure in 5% of cases
|
||||
edgeImitator.setFailureProbability(5.0);
|
||||
|
||||
edgeImitator.expectMessageAmount(numberOfTimeseriesToSend);
|
||||
Device device = findDeviceByName("Edge Device 1");
|
||||
for (int idx = 1; idx <= numberOfTimeseriesToSend; idx++) {
|
||||
String timeseriesData = "{\"data\":{\"idx\":" + idx + "},\"ts\":" + System.currentTimeMillis() + "}";
|
||||
JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
|
||||
@ -192,4 +192,38 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeseriesDeliveryFailuresForever_deliverOnlyDeviceUpdateMsgs() throws Exception {
|
||||
int numberOfMsgsToSend = 100;
|
||||
|
||||
Device device = findDeviceByName("Edge Device 1");
|
||||
|
||||
edgeImitator.setRandomFailuresOnTimeseriesDownlink(true);
|
||||
// imitator will generate failure in 100% of timeseries cases
|
||||
edgeImitator.setFailureProbability(100);
|
||||
edgeImitator.expectMessageAmount(numberOfMsgsToSend);
|
||||
for (int idx = 1; idx <= numberOfMsgsToSend; idx++) {
|
||||
String timeseriesData = "{\"data\":{\"idx\":" + idx + "},\"ts\":" + System.currentTimeMillis() + "}";
|
||||
JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
|
||||
EdgeEvent failedEdgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED,
|
||||
device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
|
||||
edgeEventService.saveAsync(failedEdgeEvent).get();
|
||||
|
||||
EdgeEvent successEdgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.UPDATED,
|
||||
device.getId().getId(), EdgeEventType.DEVICE, null);
|
||||
edgeEventService.saveAsync(successEdgeEvent).get();
|
||||
|
||||
clusterService.onEdgeEventUpdate(tenantId, edge.getId());
|
||||
}
|
||||
|
||||
Assert.assertTrue(edgeImitator.waitForMessages(120));
|
||||
|
||||
List<EntityDataProto> allTelemetryMsgs = edgeImitator.findAllMessagesByType(EntityDataProto.class);
|
||||
Assert.assertTrue(allTelemetryMsgs.isEmpty());
|
||||
|
||||
List<DeviceUpdateMsg> deviceUpdateMsgs = edgeImitator.findAllMessagesByType(DeviceUpdateMsg.class);
|
||||
Assert.assertEquals(numberOfMsgsToSend, deviceUpdateMsgs.size());
|
||||
|
||||
edgeImitator.setRandomFailuresOnTimeseriesDownlink(false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,6 +106,7 @@ public class EdgeImitator {
|
||||
this.routingSecret = routingSecret;
|
||||
setEdgeCredentials("rpcHost", host);
|
||||
setEdgeCredentials("rpcPort", port);
|
||||
setEdgeCredentials("timeoutSecs", 3);
|
||||
setEdgeCredentials("keepAliveTimeSec", 300);
|
||||
}
|
||||
|
||||
@ -151,23 +152,19 @@ public class EdgeImitator {
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<Void> result) {
|
||||
if (connected) {
|
||||
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder()
|
||||
.setDownlinkMsgId(downlinkMsg.getDownlinkMsgId())
|
||||
.setSuccess(true).build();
|
||||
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (connected) {
|
||||
DownlinkResponseMsg downlinkResponseMsg = DownlinkResponseMsg.newBuilder()
|
||||
.setDownlinkMsgId(downlinkMsg.getDownlinkMsgId())
|
||||
.setSuccess(false).setErrorMsg(t.getMessage()).build();
|
||||
edgeRpcClient.sendDownlinkResponseMsg(downlinkResponseMsg);
|
||||
}
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -87,6 +87,8 @@ public class EntityIdFactory {
|
||||
|
||||
public static EntityId getByEdgeEventTypeAndUuid(EdgeEventType edgeEventType, UUID uuid) {
|
||||
switch (edgeEventType) {
|
||||
case TENANT:
|
||||
return new TenantId(uuid);
|
||||
case CUSTOMER:
|
||||
return new CustomerId(uuid);
|
||||
case USER:
|
||||
|
||||
@ -142,7 +142,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
log.debug("[{}] The rpc session received an error!", edgeKey, t);
|
||||
log.warn("[{}] Stream was terminated due to error:", edgeKey, t);
|
||||
try {
|
||||
EdgeGrpcClient.this.disconnect(true);
|
||||
} catch (InterruptedException e) {
|
||||
@ -153,7 +153,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
log.debug("[{}] The rpc session was closed!", edgeKey);
|
||||
log.info("[{}] Stream was closed and completed successfully!", edgeKey);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user