Fix Telemetry subscription service.

This commit is contained in:
Igor Kulikov 2018-11-05 19:46:33 +02:00
parent 766a56fa9e
commit 4d6ee559fd

View File

@ -35,18 +35,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.*;
import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.dao.attributes.AttributesService;
@ -381,7 +370,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) {
EntityId entityId = subscription.getEntityId();
log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
log.trace("[{}] Registering remote subscription [{}] for entity [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address);
registerSubscription(sessionId, entityId, subscription);
if (subscription.getType() == TelemetryFeature.ATTRIBUTES) {
final Map<String, Long> keyStates = subscription.getKeyStates();
@ -401,17 +390,22 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio
long curTs = System.currentTimeMillis();
List<ReadTsKvQuery> queries = new ArrayList<>();
subscription.getKeyStates().entrySet().forEach(e -> {
queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs));
if (curTs > e.getValue()) {
queries.add(new BaseReadTsKvQuery(e.getKey(), e.getValue() + 1L, curTs, 0, 1000, Aggregation.NONE));
} else {
log.debug("[{}] Invalid subscription [{}], entityId [{}] curTs [{}]", sessionId, subscription, entityId, curTs);
}
});
DonAsynchron.withCallback(tsService.findAll(entityId, queries),
missedUpdates -> {
if (missedUpdates != null && !missedUpdates.isEmpty()) {
tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
}
},
e -> log.error("Failed to fetch missed updates.", e),
tsCallBackExecutor);
if (!queries.isEmpty()) {
DonAsynchron.withCallback(tsService.findAll(entityId, queries),
missedUpdates -> {
if (missedUpdates != null && !missedUpdates.isEmpty()) {
tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates));
}
},
e -> log.error("Failed to fetch missed updates.", e),
tsCallBackExecutor);
}
}
}