DefaultTelemetryWebSocketService - on subscribe response order fixed (will send response after subscription service called)

This commit is contained in:
Sergey Matvienko 2023-02-24 16:18:32 +01:00
parent 433a5dcbcf
commit f706fbe784

View File

@ -450,7 +450,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@Override
public void onSuccess(List<AttributeKvEntry> data) {
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, 0L));
@ -470,6 +469,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
.updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
.build();
oldSubService.addSubscription(sub);
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
}
@Override
@ -550,7 +550,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
@Override
public void onSuccess(List<AttributeKvEntry> data) {
List<TsKvEntry> attributesData = data.stream().map(d -> new BasicTsKvEntry(d.getLastUpdateTs(), d)).collect(Collectors.toList());
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
Map<String, Long> subState = new HashMap<>(attributesData.size());
attributesData.forEach(v -> subState.put(v.getKey(), v.getTs()));
@ -568,6 +567,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
.updateConsumer(DefaultTelemetryWebSocketService.this::sendWsMsg)
.scope(scope).build();
oldSubService.addSubscription(sub);
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
}
@Override
@ -636,7 +636,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
FutureCallback<List<TsKvEntry>> callback = new FutureCallback<List<TsKvEntry>>() {
@Override
public void onSuccess(List<TsKvEntry> data) {
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(data.size());
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
@ -650,6 +649,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
.allKeys(true)
.keyStates(subState).build();
oldSubService.addSubscription(sub);
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
}
@Override
@ -673,7 +673,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
return new FutureCallback<>() {
@Override
public void onSuccess(List<TsKvEntry> data) {
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
Map<String, Long> subState = new HashMap<>(keys.size());
keys.forEach(key -> subState.put(key, startTs));
data.forEach(v -> subState.put(v.getKey(), v.getTs()));
@ -688,6 +687,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
.allKeys(false)
.keyStates(subState).build();
oldSubService.addSubscription(sub);
sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
}
@Override