Improve logic to group latest by ts

This commit is contained in:
Volodymyr Babak 2023-01-10 10:28:14 +02:00
parent d0847bb80a
commit 7cc1a3f33b
3 changed files with 58 additions and 46 deletions

View File

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@ -334,7 +335,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return null;
}
return constructEntityDataProtoMsg(entityId, edgeEvent.getAction(),
JsonUtils.parse(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody())));
JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody())));
}
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) {

View File

@ -137,48 +137,49 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
}
String scope = attributesRequestMsg.getScope();
ListenableFuture<List<AttributeKvEntry>> findAttrFuture = attributesService.findAll(tenantId, entityId, scope);
return Futures.transformAsync(findAttrFuture, ssAttributes -> {
if (ssAttributes == null || ssAttributes.isEmpty()) {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
return Futures.immediateFuture(null);
}
return processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg);
}, dbCallbackExecutorService);
return Futures.transformAsync(findAttrFuture, ssAttributes
-> processEntityAttributesAndAddToEdgeQueue(tenantId, entityId, edge, entityType, scope, ssAttributes, attributesRequestMsg),
dbCallbackExecutorService);
}
private ListenableFuture<Void> processEntityAttributesAndAddToEdgeQueue(TenantId tenantId, EntityId entityId, Edge edge,
EdgeEventType entityType, String scope, List<AttributeKvEntry> ssAttributes,
AttributesRequestMsg attributesRequestMsg) {
try {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
&& !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
continue;
}
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
ListenableFuture<Void> future;
if (attributes.size() > 0) {
entityData.put("kv", attributes);
entityData.put("scope", scope);
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
} else {
if (ssAttributes == null || ssAttributes.isEmpty()) {
log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId,
edge.getName(),
entityId.getEntityType(),
entityId.getId());
future = Futures.immediateFuture(null);
} else {
Map<String, Object> entityData = new HashMap<>();
ObjectNode attributes = JacksonUtil.OBJECT_MAPPER.createObjectNode();
for (AttributeKvEntry attr : ssAttributes) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())
&& !DefaultDeviceStateService.INACTIVITY_TIMEOUT.equals(attr.getKey())) {
continue;
}
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
attributes.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
attributes.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
attributes.put(attr.getKey(), attr.getLongValue().get());
} else {
attributes.put(attr.getKey(), attr.getValueAsString());
}
}
if (attributes.size() > 0) {
entityData.put("kv", attributes);
entityData.put("scope", scope);
JsonNode body = JacksonUtil.OBJECT_MAPPER.valueToTree(entityData);
log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", entityId, body);
future = saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.ATTRIBUTES_UPDATED, entityId, body);
} else {
future = Futures.immediateFuture(null);
}
}
return Futures.transformAsync(future, v -> processLatestTimeseriesAndAddToEdgeQueue(tenantId, entityId, edge, entityType), dbCallbackExecutorService);
} catch (Exception e) {
@ -199,16 +200,18 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
entityId.getId());
return Futures.immediateFuture(null);
}
List<ListenableFuture<Void>> futures = new ArrayList<>();
Map<Long, Map<String, Object>> tsData = new HashMap<>();
for (TsKvEntry tsKvEntry : tsKvEntries) {
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(tsKvEntry.getKey())) {
continue;
}
ObjectNode entityBody = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ObjectNode ts = JacksonUtil.OBJECT_MAPPER.createObjectNode();
ts.put(tsKvEntry.getKey(), tsKvEntry.getValueAsString());
entityBody.set("data", ts);
entityBody.put("ts", tsKvEntry.getTs());
tsData.computeIfAbsent(tsKvEntry.getTs(), k -> new HashMap<>()).put(tsKvEntry.getKey(), tsKvEntry.getValue());
}
List<ListenableFuture<Void>> futures = new ArrayList<>();
for (Map.Entry<Long, Map<String, Object>> entry : tsData.entrySet()) {
Map<String, Object> entityBody = new HashMap<>();
entityBody.put("data", entry.getValue());
entityBody.put("ts", entry.getKey());
futures.add(saveEdgeEvent(tenantId, edge.getId(), entityType, EdgeEventActionType.TIMESERIES_UPDATED, entityId, JacksonUtil.valueToTree(entityBody)));
}
return Futures.transform(Futures.allAsList(futures), v -> null, dbCallbackExecutorService);

View File

@ -701,7 +701,7 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
public void testVerifyDeliveryOfLatestTimeseriesOnAttributesRequest() throws Exception {
Device device = findDeviceByName("Edge Device 1");
JsonNode timeseriesData = mapper.readTree("{\"temperature\":25}");
JsonNode timeseriesData = mapper.readTree("{\"temperature\":25, \"isEnabled\": true}");
doPost("/api/plugins/telemetry/DEVICE/" + device.getUuidId() + "/timeseries/" + DataConstants.SERVER_SCOPE,
timeseriesData);
@ -740,9 +740,17 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
TransportProtos.PostTelemetryMsg timeseriesUpdatedMsg = latestEntityDataMsg.getPostTelemetryMsg();
Assert.assertEquals(1, timeseriesUpdatedMsg.getTsKvListList().size());
TransportProtos.TsKvListProto tsKvListProto = timeseriesUpdatedMsg.getTsKvListList().get(0);
Assert.assertEquals(1, tsKvListProto.getKvList().size());
TransportProtos.KeyValueProto keyValueProto = tsKvListProto.getKvList().get(0);
Assert.assertEquals(25, keyValueProto.getLongV());
Assert.assertEquals("temperature", keyValueProto.getKey());
Assert.assertEquals(2, tsKvListProto.getKvList().size());
for (TransportProtos.KeyValueProto keyValueProto : tsKvListProto.getKvList()) {
if ("temperature".equals(keyValueProto.getKey())) {
Assert.assertEquals(TransportProtos.KeyValueType.LONG_V, keyValueProto.getType());
Assert.assertEquals(25, keyValueProto.getLongV());
} else if ("isEnabled".equals(keyValueProto.getKey())) {
Assert.assertEquals(TransportProtos.KeyValueType.BOOLEAN_V, keyValueProto.getType());
Assert.assertTrue(keyValueProto.getBoolV());
} else {
Assert.fail("Unexpected key: " + keyValueProto.getKey());
}
}
}
}