Save time series strategies: refactor boolean flags in TimeseriesSaveRequest.java to SaveActions nested record
This commit is contained in:
parent
f419a6e438
commit
39e47cd484
@ -348,9 +348,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
|
|||||||
.tenantId(entityView.getTenantId())
|
.tenantId(entityView.getTenantId())
|
||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(latestValues)
|
.entries(latestValues)
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.callback(new FutureCallback<Void>() {
|
.callback(new FutureCallback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Void tmp) {
|
public void onSuccess(@Nullable Void tmp) {
|
||||||
|
|||||||
@ -118,10 +118,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
EntityId entityId = request.getEntityId();
|
EntityId entityId = request.getEntityId();
|
||||||
checkInternalEntity(entityId);
|
checkInternalEntity(entityId);
|
||||||
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
|
||||||
if (sysTenant || !request.isSaveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
if (sysTenant || !request.getSaveActions().saveTimeseries() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
|
||||||
KvUtils.validate(request.getEntries(), valueNoXssValidation);
|
KvUtils.validate(request.getEntries(), valueNoXssValidation);
|
||||||
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
|
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
|
||||||
if (request.isSaveTimeseries()) {
|
if (request.getSaveActions().saveTimeseries()) {
|
||||||
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
|
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
|
||||||
Futures.addCallback(future, callback, tsCallBackExecutor);
|
Futures.addCallback(future, callback, tsCallBackExecutor);
|
||||||
}
|
}
|
||||||
@ -134,22 +134,23 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
|
||||||
TenantId tenantId = request.getTenantId();
|
TenantId tenantId = request.getTenantId();
|
||||||
EntityId entityId = request.getEntityId();
|
EntityId entityId = request.getEntityId();
|
||||||
|
TimeseriesSaveRequest.SaveActions saveActions = request.getSaveActions();
|
||||||
ListenableFuture<Integer> saveFuture;
|
ListenableFuture<Integer> saveFuture;
|
||||||
if (request.isSaveTimeseries() && request.isSaveLatest()) {
|
if (saveActions.saveTimeseries() && saveActions.saveLatest()) {
|
||||||
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||||
} else if (request.isSaveLatest()) {
|
} else if (saveActions.saveLatest()) {
|
||||||
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
|
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
|
||||||
} else if (request.isSaveTimeseries()) {
|
} else if (saveActions.saveTimeseries()) {
|
||||||
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
|
||||||
} else {
|
} else {
|
||||||
saveFuture = Futures.immediateFuture(0);
|
saveFuture = Futures.immediateFuture(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
addMainCallback(saveFuture, request.getCallback());
|
addMainCallback(saveFuture, request.getCallback());
|
||||||
if (request.isSendWsUpdate()) {
|
if (saveActions.sendWsUpdate()) {
|
||||||
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
|
||||||
}
|
}
|
||||||
if (request.isSaveLatest()) {
|
if (saveActions.saveLatest()) {
|
||||||
copyLatestToEntityViews(tenantId, entityId, request.getEntries());
|
copyLatestToEntityViews(tenantId, entityId, request.getEntries());
|
||||||
}
|
}
|
||||||
return saveFuture;
|
return saveFuture;
|
||||||
@ -236,9 +237,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
|||||||
.tenantId(tenantId)
|
.tenantId(tenantId)
|
||||||
.entityId(entityView.getId())
|
.entityId(entityView.getId())
|
||||||
.entries(entityViewLatest)
|
.entries(entityViewLatest)
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.callback(new FutureCallback<>() {
|
.callback(new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable Void tmp) {}
|
public void onSuccess(@Nullable Void tmp) {}
|
||||||
|
|||||||
@ -93,9 +93,7 @@ class DefaultTbEntityViewServiceTest {
|
|||||||
.entityId(entityView.getId())
|
.entityId(entityView.getId())
|
||||||
.entries(latest)
|
.entries(latest)
|
||||||
.ttl(0L)
|
.ttl(0L)
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
var actualCopyLatestRequest = captor.getValue();
|
var actualCopyLatestRequest = captor.getValue();
|
||||||
|
|||||||
@ -173,9 +173,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(true)
|
.saveActions(new TimeseriesSaveRequest.SaveActions(true, false, false))
|
||||||
.saveLatest(false)
|
|
||||||
.sendWsUpdate(false)
|
|
||||||
.callback(emptyCallback)
|
.callback(emptyCallback)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -195,9 +193,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.callback(emptyCallback)
|
.callback(emptyCallback)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -220,9 +216,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(true)
|
.saveActions(TimeseriesSaveRequest.SaveActions.SAVE_ALL)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.future(future)
|
.future(future)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -248,9 +242,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.future(future)
|
.future(future)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -283,9 +275,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(false)
|
.saveActions(new TimeseriesSaveRequest.SaveActions(false, true, false))
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(false)
|
|
||||||
.callback(emptyCallback)
|
.callback(emptyCallback)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -312,9 +302,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(true)
|
.saveActions(new TimeseriesSaveRequest.SaveActions(true, false, false))
|
||||||
.saveLatest(false)
|
|
||||||
.sendWsUpdate(false)
|
|
||||||
.callback(emptyCallback)
|
.callback(emptyCallback)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@ -340,9 +328,7 @@ class DefaultTelemetrySubscriptionServiceTest {
|
|||||||
.entityId(entityId)
|
.entityId(entityId)
|
||||||
.entries(sampleTelemetry)
|
.entries(sampleTelemetry)
|
||||||
.ttl(sampleTtl)
|
.ttl(sampleTtl)
|
||||||
.saveTimeseries(saveTimeseries)
|
.saveActions(new TimeseriesSaveRequest.SaveActions(saveTimeseries, saveLatest, sendWsUpdate))
|
||||||
.saveLatest(saveLatest)
|
|
||||||
.sendWsUpdate(sendWsUpdate)
|
|
||||||
.callback(emptyCallback)
|
.callback(emptyCallback)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|||||||
@ -38,11 +38,18 @@ public class TimeseriesSaveRequest {
|
|||||||
private final EntityId entityId;
|
private final EntityId entityId;
|
||||||
private final List<TsKvEntry> entries;
|
private final List<TsKvEntry> entries;
|
||||||
private final long ttl;
|
private final long ttl;
|
||||||
private final boolean saveTimeseries;
|
private final SaveActions saveActions;
|
||||||
private final boolean saveLatest;
|
|
||||||
private final boolean sendWsUpdate;
|
|
||||||
private final FutureCallback<Void> callback;
|
private final FutureCallback<Void> callback;
|
||||||
|
|
||||||
|
public record SaveActions(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {
|
||||||
|
|
||||||
|
public static final SaveActions SAVE_ALL = new SaveActions(true, true, true);
|
||||||
|
public static final SaveActions WS_ONLY = new SaveActions(false, false, true);
|
||||||
|
public static final SaveActions LATEST_AND_WS = new SaveActions(false, true, true);
|
||||||
|
public static final SaveActions SKIP_ALL = new SaveActions(false, false, false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static Builder builder() {
|
public static Builder builder() {
|
||||||
return new Builder();
|
return new Builder();
|
||||||
}
|
}
|
||||||
@ -54,9 +61,7 @@ public class TimeseriesSaveRequest {
|
|||||||
private EntityId entityId;
|
private EntityId entityId;
|
||||||
private List<TsKvEntry> entries;
|
private List<TsKvEntry> entries;
|
||||||
private long ttl;
|
private long ttl;
|
||||||
private boolean saveTimeseries = true;
|
private SaveActions saveActions = SaveActions.SAVE_ALL;
|
||||||
private boolean saveLatest = true;
|
|
||||||
private boolean sendWsUpdate = true;
|
|
||||||
private FutureCallback<Void> callback;
|
private FutureCallback<Void> callback;
|
||||||
|
|
||||||
Builder() {}
|
Builder() {}
|
||||||
@ -94,18 +99,8 @@ public class TimeseriesSaveRequest {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder saveTimeseries(boolean saveTimeseries) {
|
public Builder saveActions(SaveActions settings) {
|
||||||
this.saveTimeseries = saveTimeseries;
|
this.saveActions = settings;
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder saveLatest(boolean saveLatest) {
|
|
||||||
this.saveLatest = saveLatest;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Builder sendWsUpdate(boolean sendWsUpdate) {
|
|
||||||
this.sendWsUpdate = sendWsUpdate;
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,7 +124,7 @@ public class TimeseriesSaveRequest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public TimeseriesSaveRequest build() {
|
public TimeseriesSaveRequest build() {
|
||||||
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveTimeseries, saveLatest, sendWsUpdate, callback);
|
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveActions, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,12 +22,30 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||||||
class TimeseriesSaveRequestTest {
|
class TimeseriesSaveRequestTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testBooleanFlagsDefaultToTrue() {
|
void testDefaultSaveActionsAreSaveAll() {
|
||||||
var request = TimeseriesSaveRequest.builder().build();
|
var request = TimeseriesSaveRequest.builder().build();
|
||||||
|
|
||||||
assertThat(request.isSaveTimeseries()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL);
|
||||||
assertThat(request.isSaveLatest()).isTrue();
|
}
|
||||||
assertThat(request.isSendWsUpdate()).isTrue();
|
|
||||||
|
@Test
|
||||||
|
void testSaveActionsSaveAll() {
|
||||||
|
assertThat(TimeseriesSaveRequest.SaveActions.SAVE_ALL).isEqualTo(new TimeseriesSaveRequest.SaveActions(true, true, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSaveActionsWsOnly() {
|
||||||
|
assertThat(TimeseriesSaveRequest.SaveActions.WS_ONLY).isEqualTo(new TimeseriesSaveRequest.SaveActions(false, false, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSaveActionsLatestAndWs() {
|
||||||
|
assertThat(TimeseriesSaveRequest.SaveActions.LATEST_AND_WS).isEqualTo(new TimeseriesSaveRequest.SaveActions(false, true, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSaveActionsSkipAll() {
|
||||||
|
assertThat(TimeseriesSaveRequest.SaveActions.SKIP_ALL).isEqualTo(new TimeseriesSaveRequest.SaveActions(false, false, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -107,13 +107,10 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
}
|
}
|
||||||
long ts = computeTs(msg, config.isUseServerTs());
|
long ts = computeTs(msg, config.isUseServerTs());
|
||||||
|
|
||||||
PersistenceDecision persistenceDecision = makePersistenceDecision(ts, msg.getOriginator().getId());
|
TimeseriesSaveRequest.SaveActions saveActions = determineSaveActions(ts, msg.getOriginator().getId());
|
||||||
boolean saveTimeseries = persistenceDecision.saveTimeseries();
|
|
||||||
boolean saveLatest = persistenceDecision.saveLatest();
|
|
||||||
boolean sendWsUpdate = persistenceDecision.sendWsUpdate();
|
|
||||||
|
|
||||||
// short-circuit
|
// short-circuit
|
||||||
if (!saveTimeseries && !saveLatest && !sendWsUpdate) {
|
if (!saveActions.saveTimeseries() && !saveActions.saveLatest() && !saveActions.sendWsUpdate()) {
|
||||||
ctx.tellSuccess(msg);
|
ctx.tellSuccess(msg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -141,9 +138,7 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entries(tsKvEntryList)
|
.entries(tsKvEntryList)
|
||||||
.ttl(ttl)
|
.ttl(ttl)
|
||||||
.saveTimeseries(saveTimeseries)
|
.saveActions(saveActions)
|
||||||
.saveLatest(saveLatest)
|
|
||||||
.sendWsUpdate(sendWsUpdate)
|
|
||||||
.callback(new TelemetryNodeCallback(ctx, msg))
|
.callback(new TelemetryNodeCallback(ctx, msg))
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
@ -152,35 +147,26 @@ public class TbMsgTimeseriesNode implements TbNode {
|
|||||||
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
|
return ignoreMetadataTs ? System.currentTimeMillis() : msg.getMetaDataTs();
|
||||||
}
|
}
|
||||||
|
|
||||||
private record PersistenceDecision(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate) {}
|
private TimeseriesSaveRequest.SaveActions determineSaveActions(long ts, UUID originatorUuid) {
|
||||||
|
|
||||||
private PersistenceDecision makePersistenceDecision(long ts, UUID originatorUuid) {
|
|
||||||
boolean saveTimeseries;
|
|
||||||
boolean saveLatest;
|
|
||||||
boolean sendWsUpdate;
|
|
||||||
|
|
||||||
if (persistenceSettings instanceof OnEveryMessage) {
|
if (persistenceSettings instanceof OnEveryMessage) {
|
||||||
saveTimeseries = true;
|
return TimeseriesSaveRequest.SaveActions.SAVE_ALL;
|
||||||
saveLatest = true;
|
|
||||||
sendWsUpdate = true;
|
|
||||||
} else if (persistenceSettings instanceof WebSocketsOnly) {
|
|
||||||
saveTimeseries = false;
|
|
||||||
saveLatest = false;
|
|
||||||
sendWsUpdate = true;
|
|
||||||
} else if (persistenceSettings instanceof Deduplicate deduplicate) {
|
|
||||||
boolean isFirstMsgInInterval = deduplicate.getDeduplicateStrategy().shouldPersist(ts, originatorUuid);
|
|
||||||
saveTimeseries = isFirstMsgInInterval;
|
|
||||||
saveLatest = isFirstMsgInInterval;
|
|
||||||
sendWsUpdate = isFirstMsgInInterval;
|
|
||||||
} else if (persistenceSettings instanceof Advanced advanced) {
|
|
||||||
saveTimeseries = advanced.timeseries().shouldPersist(ts, originatorUuid);
|
|
||||||
saveLatest = advanced.latest().shouldPersist(ts, originatorUuid);
|
|
||||||
sendWsUpdate = advanced.webSockets().shouldPersist(ts, originatorUuid);
|
|
||||||
} else { // should not happen
|
|
||||||
throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName());
|
|
||||||
}
|
}
|
||||||
|
if (persistenceSettings instanceof WebSocketsOnly) {
|
||||||
return new PersistenceDecision(saveTimeseries, saveLatest, sendWsUpdate);
|
return TimeseriesSaveRequest.SaveActions.WS_ONLY;
|
||||||
|
}
|
||||||
|
if (persistenceSettings instanceof Deduplicate deduplicate) {
|
||||||
|
boolean isFirstMsgInInterval = deduplicate.getDeduplicateStrategy().shouldPersist(ts, originatorUuid);
|
||||||
|
return isFirstMsgInInterval ? TimeseriesSaveRequest.SaveActions.SAVE_ALL : TimeseriesSaveRequest.SaveActions.SKIP_ALL;
|
||||||
|
}
|
||||||
|
if (persistenceSettings instanceof Advanced advanced) {
|
||||||
|
return new TimeseriesSaveRequest.SaveActions(
|
||||||
|
advanced.timeseries().shouldPersist(ts, originatorUuid),
|
||||||
|
advanced.latest().shouldPersist(ts, originatorUuid),
|
||||||
|
advanced.webSockets().shouldPersist(ts, originatorUuid)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// should not happen
|
||||||
|
throw new IllegalArgumentException("Unknown persistence settings type: " + persistenceSettings.getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -533,7 +533,7 @@ public class TbMathNodeTest {
|
|||||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||||
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
|
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
|
||||||
assertThat(request.getEntries()).size().isOne();
|
assertThat(request.getEntries()).size().isOne();
|
||||||
assertThat(request.isSaveLatest()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
TbMsg resultMsg = msgCaptor.getValue();
|
TbMsg resultMsg = msgCaptor.getValue();
|
||||||
@ -569,7 +569,7 @@ public class TbMathNodeTest {
|
|||||||
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
|
||||||
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
|
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
|
||||||
assertThat(request.getEntries()).size().isOne();
|
assertThat(request.getEntries()).size().isOne();
|
||||||
assertThat(request.isSaveLatest()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
TbMsg resultMsg = msgCaptor.getValue();
|
TbMsg resultMsg = msgCaptor.getValue();
|
||||||
|
|||||||
@ -207,7 +207,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
||||||
assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList);
|
assertThat(request.getEntries()).usingRecursiveFieldByFieldElementComparatorIgnoringFields("ts").containsExactlyElementsOf(expectedList);
|
||||||
assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile));
|
assertThat(request.getTtl()).isEqualTo(extractTtlAsSeconds(tenantProfile));
|
||||||
assertThat(request.isSaveLatest()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL);
|
||||||
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
||||||
}));
|
}));
|
||||||
verify(ctxMock).tellSuccess(msg);
|
verify(ctxMock).tellSuccess(msg);
|
||||||
@ -264,9 +264,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
||||||
assertThat(request.getEntries()).containsExactlyElementsOf(expectedList);
|
assertThat(request.getEntries()).containsExactlyElementsOf(expectedList);
|
||||||
assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL());
|
assertThat(request.getTtl()).isEqualTo(config.getDefaultTTL());
|
||||||
assertThat(request.isSaveTimeseries()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(new TimeseriesSaveRequest.SaveActions(true, false, true));
|
||||||
assertThat(request.isSaveLatest()).isFalse();
|
|
||||||
assertThat(request.isSendWsUpdate()).isTrue();
|
|
||||||
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
||||||
}));
|
}));
|
||||||
verify(ctxMock).tellSuccess(msg);
|
verify(ctxMock).tellSuccess(msg);
|
||||||
@ -305,7 +303,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
assertThat(request.getCustomerId()).isNull();
|
assertThat(request.getCustomerId()).isNull();
|
||||||
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
|
||||||
assertThat(request.getTtl()).isEqualTo(expectedTtl);
|
assertThat(request.getTtl()).isEqualTo(expectedTtl);
|
||||||
assertThat(request.isSaveLatest()).isTrue();
|
assertThat(request.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL);
|
||||||
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
assertThat(request.getCallback()).isInstanceOf(TelemetryNodeCallback.class);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
@ -354,9 +352,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
||||||
.ttl(extractTtlAsSeconds(tenantProfile))
|
.ttl(extractTtlAsSeconds(tenantProfile))
|
||||||
.saveTimeseries(true)
|
.saveActions(TimeseriesSaveRequest.SaveActions.SAVE_ALL)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
node.onMsg(ctxMock, msg);
|
node.onMsg(ctxMock, msg);
|
||||||
@ -391,9 +387,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
||||||
.ttl(extractTtlAsSeconds(tenantProfile))
|
.ttl(extractTtlAsSeconds(tenantProfile))
|
||||||
.saveTimeseries(true)
|
.saveActions(TimeseriesSaveRequest.SaveActions.SAVE_ALL)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
node.onMsg(ctxMock, msg);
|
node.onMsg(ctxMock, msg);
|
||||||
@ -428,9 +422,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
||||||
.ttl(extractTtlAsSeconds(tenantProfile))
|
.ttl(extractTtlAsSeconds(tenantProfile))
|
||||||
.saveTimeseries(false)
|
.saveActions(TimeseriesSaveRequest.SaveActions.WS_ONLY)
|
||||||
.saveLatest(false)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
node.onMsg(ctxMock, msg);
|
node.onMsg(ctxMock, msg);
|
||||||
@ -469,9 +461,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.entityId(msg.getOriginator())
|
.entityId(msg.getOriginator())
|
||||||
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
.entry(new BasicTsKvEntry(123L, new DoubleDataEntry("temperature", 22.3)))
|
||||||
.ttl(extractTtlAsSeconds(tenantProfile))
|
.ttl(extractTtlAsSeconds(tenantProfile))
|
||||||
.saveTimeseries(true)
|
.saveActions(TimeseriesSaveRequest.SaveActions.SAVE_ALL)
|
||||||
.saveLatest(true)
|
|
||||||
.sendWsUpdate(true)
|
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
node.onMsg(ctxMock, msg);
|
node.onMsg(ctxMock, msg);
|
||||||
@ -508,11 +498,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1))))
|
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts1))))
|
||||||
.build());
|
.build());
|
||||||
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
||||||
actualSaveRequest -> {
|
actualSaveRequest -> assertThat(actualSaveRequest.getSaveActions()).isEqualTo(TimeseriesSaveRequest.SaveActions.SAVE_ALL)
|
||||||
assertThat(actualSaveRequest.isSaveTimeseries()).isTrue();
|
|
||||||
assertThat(actualSaveRequest.isSaveLatest()).isTrue();
|
|
||||||
assertThat(actualSaveRequest.isSendWsUpdate()).isTrue();
|
|
||||||
}
|
|
||||||
));
|
));
|
||||||
|
|
||||||
clearInvocations(telemetryServiceMock);
|
clearInvocations(telemetryServiceMock);
|
||||||
@ -524,11 +510,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2))))
|
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts2))))
|
||||||
.build());
|
.build());
|
||||||
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
||||||
actualSaveRequest -> {
|
actualSaveRequest -> assertThat(actualSaveRequest.getSaveActions()).isEqualTo(new TimeseriesSaveRequest.SaveActions(true, false, false))
|
||||||
assertThat(actualSaveRequest.isSaveTimeseries()).isTrue();
|
|
||||||
assertThat(actualSaveRequest.isSaveLatest()).isFalse();
|
|
||||||
assertThat(actualSaveRequest.isSendWsUpdate()).isFalse();
|
|
||||||
}
|
|
||||||
));
|
));
|
||||||
|
|
||||||
clearInvocations(telemetryServiceMock);
|
clearInvocations(telemetryServiceMock);
|
||||||
@ -540,11 +522,7 @@ public class TbMsgTimeseriesNodeTest extends AbstractRuleNodeUpgradeTest {
|
|||||||
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3))))
|
.metaData(new TbMsgMetaData(Map.of("ts", Long.toString(ts3))))
|
||||||
.build());
|
.build());
|
||||||
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
then(telemetryServiceMock).should().saveTimeseries(assertArg(
|
||||||
actualSaveRequest -> {
|
actualSaveRequest -> assertThat(actualSaveRequest.getSaveActions()).isEqualTo(new TimeseriesSaveRequest.SaveActions(true, true, false))
|
||||||
assertThat(actualSaveRequest.isSaveTimeseries()).isTrue();
|
|
||||||
assertThat(actualSaveRequest.isSaveLatest()).isTrue();
|
|
||||||
assertThat(actualSaveRequest.isSendWsUpdate()).isFalse();
|
|
||||||
}
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user