Fixed incorrect api usage count by customer. Fixed duplciate ATTRIBUTE_UPDATE events

This commit is contained in:
Volodymyr Babak 2022-10-26 18:21:19 +03:00
parent 03bc439bb9
commit 8fdeecdf07
2 changed files with 71 additions and 43 deletions

View File

@ -560,7 +560,7 @@ public final class EdgeGrpcSession implements Closeable {
try { try {
if (uplinkMsg.getEntityDataCount() > 0) { if (uplinkMsg.getEntityDataCount() > 0) {
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData));
} }
} }
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {

View File

@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.processor;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson; import com.google.gson.Gson;
@ -36,8 +35,8 @@ import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetId;
@ -59,7 +58,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.controller.BaseController; import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.edge.v1.EntityDataProto;
@ -92,19 +91,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
} }
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) {
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData);
List<ListenableFuture<Void>> result = new ArrayList<>(); List<ListenableFuture<Void>> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData); EntityId entityId = constructEntityId(entityData);
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId); Pair<TbMsgMetaData, CustomerId> pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId);
TbMsgMetaData metaData = pair.getKey();
CustomerId customerId = pair.getValue();
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
if (entityData.hasPostAttributesMsg()) { if (entityData.hasPostAttributesMsg()) {
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
} }
if (entityData.hasAttributesUpdatedMsg()) { if (entityData.hasAttributesUpdatedMsg()) {
metaData.putValue("scope", entityData.getPostAttributeScope()); metaData.putValue("scope", entityData.getPostAttributeScope());
result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData));
} }
if (entityData.hasPostTelemetryMsg()) { if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
@ -112,12 +113,16 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
if (EntityType.DEVICE.equals(entityId.getEntityType())) { if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceId deviceId = new DeviceId(entityId.getId()); DeviceId deviceId = new DeviceId(entityId.getId());
long currentTs = System.currentTimeMillis();
TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) .setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(System.currentTimeMillis()).build(); .setLastActivityTime(currentTs).build();
log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
@ -130,12 +135,14 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return result; return result;
} }
private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { private Pair<TbMsgMetaData, CustomerId> getBaseMsgMetadataAndCustomerId(TenantId tenantId, EntityId entityId) {
TbMsgMetaData metaData = new TbMsgMetaData(); TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = null;
switch (entityId.getEntityType()) { switch (entityId.getEntityType()) {
case DEVICE: case DEVICE:
Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId()));
if (device != null) { if (device != null) {
customerId = device.getCustomerId();
metaData.putValue("deviceName", device.getName()); metaData.putValue("deviceName", device.getName());
metaData.putValue("deviceType", device.getType()); metaData.putValue("deviceType", device.getType());
} }
@ -143,6 +150,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
case ASSET: case ASSET:
Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId())); Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId()));
if (asset != null) { if (asset != null) {
customerId = asset.getCustomerId();
metaData.putValue("assetName", asset.getName()); metaData.putValue("assetName", asset.getName());
metaData.putValue("assetType", asset.getType()); metaData.putValue("assetType", asset.getType());
} }
@ -150,38 +158,24 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
case ENTITY_VIEW: case ENTITY_VIEW:
EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())); EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId()));
if (entityView != null) { if (entityView != null) {
customerId = entityView.getCustomerId();
metaData.putValue("entityViewName", entityView.getName()); metaData.putValue("entityViewName", entityView.getName());
metaData.putValue("entityViewType", entityView.getType()); metaData.putValue("entityViewType", entityView.getType());
} }
break; break;
case EDGE:
Edge edge = edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId()));
if (edge != null) {
customerId = edge.getCustomerId();
metaData.putValue("edgeName", edge.getName());
metaData.putValue("edgeType", edge.getType());
}
break;
default: default:
log.debug("Using empty metadata for entityId [{}]", entityId); log.debug("Using empty metadata for entityId [{}]", entityId);
break; break;
} }
return metaData; return new ImmutablePair<>(metaData, customerId != null ? customerId : new CustomerId(ModelConstants.NULL_UUID));
}
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
RuleChainId ruleChainId = null;
String queueName = null;
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", entityId);
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueName = deviceProfile.getDefaultQueueName();
}
} else if (EntityType.ASSET.equals(entityId.getEntityType())) {
AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId()));
if (assetProfile == null) {
log.warn("[{}] Asset profile is null!", entityId);
} else {
ruleChainId = assetProfile.getDefaultRuleChainId();
queueName = assetProfile.getDefaultQueueName();
}
}
return new ImmutablePair<>(queueName, ruleChainId);
} }
private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { private ListenableFuture<Void> processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
@ -207,6 +201,29 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return futureToSet; return futureToSet;
} }
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
RuleChainId ruleChainId = null;
String queueName = null;
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", entityId);
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueName = deviceProfile.getDefaultQueueName();
}
} else if (EntityType.ASSET.equals(entityId.getEntityType())) {
AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId()));
if (assetProfile == null) {
log.warn("[{}] Asset profile is null!", entityId);
} else {
ruleChainId = assetProfile.getDefaultRuleChainId();
queueName = assetProfile.getDefaultQueueName();
}
}
return new ImmutablePair<>(queueName, ruleChainId);
}
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
@ -228,6 +245,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
} }
private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId, private ListenableFuture<Void> processAttributesUpdate(TenantId tenantId,
CustomerId customerId,
EntityId entityId, EntityId entityId,
TransportProtos.PostAttributeMsg msg, TransportProtos.PostAttributeMsg msg,
TbMsgMetaData metaData) { TbMsgMetaData metaData) {
@ -238,26 +256,34 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() { tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback<Void>() {
@Override @Override
public void onSuccess(@Nullable Void tmp) { public void onSuccess(@Nullable Void tmp) {
logAttributesUpdated(tenantId, entityId, scope, attributes, null); var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId,
customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
futureToSet.set(null); futureToSet.set(null);
} }
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t); log.error("Can't process attributes update [{}]", msg, t);
logAttributesUpdated(tenantId, entityId, scope, attributes, t); futureToSet.setException(t);
}
});
}
@Override
public void onFailure(Throwable t) {
log.error("Can't process attributes update [{}]", msg, t);
futureToSet.setException(t); futureToSet.setException(t);
} }
}); });
return futureToSet; return futureToSet;
} }
private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, Throwable e) { private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg,
notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null, String entityType) {
BaseController.toException(e), scope, attributes);
}
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) {
SettableFuture<Void> futureToSet = SettableFuture.create(); SettableFuture<Void> futureToSet = SettableFuture.create();
String scope = attributeDeleteMsg.getScope(); String scope = attributeDeleteMsg.getScope();
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList(); List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
@ -301,6 +327,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); return new CustomerId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
case USER: case USER:
return new UserId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB())); return new UserId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
case EDGE:
return new EdgeId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
default: default:
log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData); log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData);
return null; return null;