Report device activity from edge service - set active flag to true on cloud
This commit is contained in:
parent
9afdd97363
commit
6adf0f256e
@ -571,7 +571,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
try {
|
||||
if (uplinkMsg.getEntityDataCount() > 0) {
|
||||
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData));
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData, sessionId));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
|
||||
|
||||
@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
@ -52,6 +53,8 @@ import org.thingsboard.server.dao.service.DataValidator;
|
||||
import org.thingsboard.server.dao.user.UserService;
|
||||
import org.thingsboard.server.dao.widget.WidgetTypeService;
|
||||
import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor;
|
||||
@ -137,6 +140,13 @@ public abstract class BaseEdgeProcessor {
|
||||
@Autowired
|
||||
protected WidgetTypeService widgetTypeService;
|
||||
|
||||
@Autowired
|
||||
protected PartitionService partitionService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
protected TbQueueProducerProvider producerProvider;
|
||||
|
||||
@Autowired
|
||||
protected DataValidator<Device> deviceValidator;
|
||||
|
||||
|
||||
@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||
@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
|
||||
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbCoreMsgProducer;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
|
||||
}
|
||||
|
||||
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) {
|
||||
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
|
||||
List<ListenableFuture<Void>> result = new ArrayList<>();
|
||||
EntityId entityId = constructEntityId(entityData);
|
||||
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
|
||||
// @voba - in terms of performance we should not fetch device from DB by id
|
||||
// TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
|
||||
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
|
||||
if (entityData.hasPostAttributesMsg()) {
|
||||
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
|
||||
@ -96,6 +106,11 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
if (entityData.hasPostTelemetryMsg()) {
|
||||
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
|
||||
}
|
||||
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
|
||||
Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId()));
|
||||
// for edge context sessionId is exact edgeSessionId
|
||||
reportActivity(device, edgeSessionId);
|
||||
}
|
||||
}
|
||||
if (entityData.hasAttributeDeleteMsg()) {
|
||||
result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));
|
||||
@ -103,6 +118,37 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return result;
|
||||
}
|
||||
|
||||
private void reportActivity(Device device, UUID sessionId) {
|
||||
TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder()
|
||||
.setSessionIdMSB(sessionId.getMostSignificantBits())
|
||||
.setSessionIdLSB(sessionId.getLeastSignificantBits())
|
||||
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
|
||||
.setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
|
||||
.setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
|
||||
.setDeviceName(device.getName())
|
||||
.setDeviceType(device.getType())
|
||||
.setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits())
|
||||
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits());
|
||||
|
||||
if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) {
|
||||
builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits());
|
||||
builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits());
|
||||
}
|
||||
reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build());
|
||||
}
|
||||
|
||||
private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) {
|
||||
TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder()
|
||||
.setAttributeSubscription(false).setRpcSubscription(false)
|
||||
.setLastActivityTime(System.currentTimeMillis()).build();
|
||||
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
|
||||
.setSubscriptionInfo(subscriptionInfoProto).build();
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey,
|
||||
TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null);
|
||||
}
|
||||
|
||||
private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
switch (entityId.getEntityType()) {
|
||||
|
||||
@ -1339,17 +1339,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
String timeseriesKey = "key";
|
||||
String timeseriesValue = "25";
|
||||
data.addProperty(timeseriesKey, timeseriesValue);
|
||||
UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder();
|
||||
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
|
||||
EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
|
||||
entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis()));
|
||||
entityDataBuilder.setEntityType(device.getId().getEntityType().name());
|
||||
entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
|
||||
entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
|
||||
testAutoGeneratedCodeByProtobuf(entityDataBuilder);
|
||||
uplinkMsgBuilder1.addEntityData(entityDataBuilder.build());
|
||||
uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
|
||||
|
||||
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1);
|
||||
edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build());
|
||||
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
|
||||
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
|
||||
|
||||
JsonObject attributesData = new JsonObject();
|
||||
String attributesKey = "test_attr";
|
||||
@ -1381,9 +1381,22 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
|
||||
String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE;
|
||||
List<Map<String, String>> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {});
|
||||
Assert.assertEquals(2, attributes.size());
|
||||
var result = attributes.stream().filter(kv -> kv.get("key").equals(attributesKey)).filter(kv -> kv.get("value").equals(attributesValue)).findFirst();
|
||||
Assert.assertTrue(result.isPresent());
|
||||
|
||||
Assert.assertEquals(3, attributes.size());
|
||||
|
||||
Optional<Map<String, String>> activeAttributeOpt = getAttributeByKey("active", attributes);
|
||||
Assert.assertTrue(activeAttributeOpt.isPresent());
|
||||
Map<String, String> activeAttribute = activeAttributeOpt.get();
|
||||
Assert.assertEquals("true", activeAttribute.get("value"));
|
||||
|
||||
Optional<Map<String, String>> customAttributeOpt = getAttributeByKey(attributesKey, attributes);
|
||||
Assert.assertTrue(customAttributeOpt.isPresent());
|
||||
Map<String, String> customAttribute = customAttributeOpt.get();
|
||||
Assert.assertEquals(attributesValue, customAttribute.get("value"));
|
||||
}
|
||||
|
||||
private Optional<Map<String, String>> getAttributeByKey(String key, List<Map<String, String>> attributes) {
|
||||
return attributes.stream().filter(kv -> kv.get("key").equals(key)).findFirst();
|
||||
}
|
||||
|
||||
private Map<String, List<Map<String, String>>> loadDeviceTimeseries(Device device, String timeseriesKey) throws Exception {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user