diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 940d02918a..abadb7ef90 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -560,7 +560,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 98a0476178..e326616963 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.processor; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; @@ -36,8 +35,8 @@ import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; @@ -59,7 +58,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; -import org.thingsboard.server.controller.BaseController; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; @@ -92,19 +91,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); } - public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { - log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); + public List> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) { + log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); + Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); + TbMsgMetaData metaData = pair.getKey(); + CustomerId customerId = pair.getValue(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -112,12 +113,16 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { if (EntityType.DEVICE.equals(entityId.getEntityType())) { DeviceId deviceId = new DeviceId(entityId.getId()); + long currentTs = System.currentTimeMillis(); + TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastActivityTime(System.currentTimeMillis()).build(); + .setLastActivityTime(currentTs).build(); + + log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), @@ -130,12 +135,14 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return result; } - private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { + private Pair getBaseMsgMetadataAndCustomerId(TenantId tenantId, EntityId entityId) { TbMsgMetaData metaData = new TbMsgMetaData(); + CustomerId customerId = null; switch (entityId.getEntityType()) { case DEVICE: Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); if (device != null) { + customerId = device.getCustomerId(); metaData.putValue("deviceName", device.getName()); metaData.putValue("deviceType", device.getType()); } @@ -143,6 +150,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { case ASSET: Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId())); if (asset != null) { + customerId = asset.getCustomerId(); metaData.putValue("assetName", asset.getName()); metaData.putValue("assetType", asset.getType()); } @@ -150,38 +158,24 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { case ENTITY_VIEW: EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())); if (entityView != null) { + customerId = entityView.getCustomerId(); metaData.putValue("entityViewName", entityView.getName()); metaData.putValue("entityViewType", entityView.getType()); } break; + case EDGE: + Edge edge = edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())); + if (edge != null) { + customerId = edge.getCustomerId(); + metaData.putValue("edgeName", edge.getName()); + metaData.putValue("edgeType", edge.getType()); + } + break; default: log.debug("Using empty metadata for entityId [{}]", entityId); break; } - return metaData; - } - - private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { - RuleChainId ruleChainId = null; - String queueName = null; - if (EntityType.DEVICE.equals(entityId.getEntityType())) { - DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())); - if (deviceProfile == null) { - log.warn("[{}] Device profile is null!", entityId); - } else { - ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueName = deviceProfile.getDefaultQueueName(); - } - } else if (EntityType.ASSET.equals(entityId.getEntityType())) { - AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId())); - if (assetProfile == null) { - log.warn("[{}] Asset profile is null!", entityId); - } else { - ruleChainId = assetProfile.getDefaultRuleChainId(); - queueName = assetProfile.getDefaultQueueName(); - } - } - return new ImmutablePair<>(queueName, ruleChainId); + return new ImmutablePair<>(metaData, customerId != null ? customerId : new CustomerId(ModelConstants.NULL_UUID)); } private ListenableFuture processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { @@ -207,6 +201,29 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } + private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { + RuleChainId ruleChainId = null; + String queueName = null; + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())); + if (deviceProfile == null) { + log.warn("[{}] Device profile is null!", entityId); + } else { + ruleChainId = deviceProfile.getDefaultRuleChainId(); + queueName = deviceProfile.getDefaultQueueName(); + } + } else if (EntityType.ASSET.equals(entityId.getEntityType())) { + AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId())); + if (assetProfile == null) { + log.warn("[{}] Asset profile is null!", entityId); + } else { + ruleChainId = assetProfile.getDefaultRuleChainId(); + queueName = assetProfile.getDefaultQueueName(); + } + } + return new ImmutablePair<>(queueName, ruleChainId); + } + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); @@ -228,6 +245,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { } private ListenableFuture processAttributesUpdate(TenantId tenantId, + CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { @@ -238,26 +256,34 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { - logAttributesUpdated(tenantId, entityId, scope, attributes, null); - futureToSet.set(null); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, + customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process attributes update [{}]", msg, t); + futureToSet.setException(t); + } + }); } @Override public void onFailure(Throwable t) { log.error("Can't process attributes update [{}]", msg, t); - logAttributesUpdated(tenantId, entityId, scope, attributes, t); futureToSet.setException(t); } }); return futureToSet; } - private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List attributes, Throwable e) { - notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null, - BaseController.toException(e), scope, attributes); - } - - private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, + String entityType) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributeDeleteMsg.getScope(); List attributeNames = attributeDeleteMsg.getAttributeNamesList(); @@ -301,6 +327,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); case USER: return new UserId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); + case EDGE: + return new EdgeId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); default: log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData); return null;