diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index bc78139d2f..793aabc6cc 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import jakarta.annotation.Nullable; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -127,14 +126,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - @Override - public ListenableFuture saveAndNotify(TimeseriesSaveRequest request) { - SettableFuture future = SettableFuture.create(); - request.setCallback(new VoidFutureCallback(future)); - save(request); - return future; - } - @Override public ListenableFuture saveInternal(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); @@ -242,14 +233,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); } - @Override - public ListenableFuture saveAttrAndNotify(AttributesSaveRequest request) { - SettableFuture future = SettableFuture.create(); - request.setCallback(new VoidFutureCallback(future)); - save(request); - return future; - } - private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List ts) { if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), @@ -397,23 +380,4 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }; } - private static class VoidFutureCallback implements FutureCallback { - private final SettableFuture future; - - public VoidFutureCallback(SettableFuture future) { - this.future = future; - } - - @Override - public void onSuccess(Void result) { - future.set(null); - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - - } - } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index c1b12081f4..22fa8de6de 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -16,10 +16,10 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.SettableFuture; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.Setter; import lombok.ToString; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; @@ -38,10 +38,9 @@ public class AttributesSaveRequest { private final TenantId tenantId; private final EntityId entityId; private final AttributeScope scope; - private final List entries; // todo: rename to attributes? same with timeseries + private final List entries; private final boolean notifyDevice; - @Setter - private FutureCallback callback; + private final FutureCallback callback; public static Builder builder() { return new Builder(); @@ -106,6 +105,20 @@ public class AttributesSaveRequest { return this; } + public Builder future(SettableFuture future) { + return callback(new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + future.set(result); + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }); + } + public AttributesSaveRequest build() { return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, callback); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index bdc966ad88..420c0c659b 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -16,7 +16,6 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -33,14 +32,10 @@ public interface RuleEngineTelemetryService { void save(TimeseriesSaveRequest request); - ListenableFuture saveAndNotify(TimeseriesSaveRequest request); - void save(AttributesSaveRequest request); void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - ListenableFuture saveAttrAndNotify(AttributesSaveRequest request); - void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, FutureCallback callback); void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, boolean notifyDevice, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index de19c2b88e..57fba9232f 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -16,10 +16,10 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.SettableFuture; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; -import lombok.Setter; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -39,8 +39,7 @@ public class TimeseriesSaveRequest { private final List entries; private final long ttl; private final boolean saveLatest; - @Setter - private FutureCallback callback; + private final FutureCallback callback; public static Builder builder() { return new Builder(); @@ -101,6 +100,20 @@ public class TimeseriesSaveRequest { return this; } + public Builder future(SettableFuture future) { + return callback(new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + future.set(result); + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }); + } + public TimeseriesSaveRequest build() { return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, callback); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index 6f61de3fd3..1d18290129 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import net.objecthunter.exp4j.Expression; import net.objecthunter.exp4j.ExpressionBuilder; @@ -143,11 +144,14 @@ public class TbMathNode implements TbNode { private ListenableFuture saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) { final BasicTsKvEntry basicTsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result)); - return ctx.getTelemetryService().saveAndNotify(TimeseriesSaveRequest.builder() + SettableFuture future = SettableFuture.create(); + ctx.getTelemetryService().save(TimeseriesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) .entry(basicTsKvEntry) + .future(future) .build()); + return future; } private ListenableFuture saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) { @@ -160,12 +164,15 @@ public class TbMathNode implements TbNode { var value = toDoubleValue(mathResultDef, result); kvEntry = new DoubleDataEntry(mathResultDef.getKey(), value); } - return ctx.getTelemetryService().saveAttrAndNotify(AttributesSaveRequest.builder() + SettableFuture future = SettableFuture.create(); + ctx.getTelemetryService().save(AttributesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) .scope(attributeScope) .entry(kvEntry) + .future(future) .build()); + return future; } private boolean isIntegerResult(TbMathResult mathResultDef, TbRuleNodeMathFunctionType function) { diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 9104236be0..bec8946277 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -30,14 +30,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.ThrowingConsumer; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.verification.Timeout; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; @@ -76,6 +79,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -433,15 +437,17 @@ public class TbMathNodeTest { ); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString()); - - when(telemetryService.saveAttrAndNotify(any())) - .thenReturn(Futures.immediateFuture(null)); + doAnswer(invocation -> { + AttributesSaveRequest request = invocation.getArgument(0); + request.getCallback().onSuccess(null); + return null; + }).when(telemetryService).save(any(AttributesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).saveAttrAndNotify(assertArg(request -> { + verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { assertThat(request.getEntries()).singleElement().extracting(KvEntry::getValue).isInstanceOf(Double.class); })); @@ -461,13 +467,17 @@ public class TbMathNodeTest { ); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString()); - when(telemetryService.saveAndNotify(any())).thenReturn(Futures.immediateFuture(null)); + doAnswer(invocation -> { + TimeseriesSaveRequest request = invocation.getArgument(0); + request.getCallback().onSuccess(null); + return null; + }).when(telemetryService).save(any(TimeseriesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).saveAndNotify(assertArg(request -> { + verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { assertThat(request.getEntries()).size().isOne(); assertThat(request.isSaveLatest()).isTrue(); })); @@ -488,13 +498,17 @@ public class TbMathNodeTest { ); TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString()); - when(telemetryService.saveAndNotify(any())).thenReturn(Futures.immediateFuture(null)); + doAnswer(invocation -> { + TimeseriesSaveRequest request = invocation.getArgument(0); + request.getCallback().onSuccess(null); + return null; + }).when(telemetryService).save(any(TimeseriesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).saveAndNotify(assertArg(request -> { + verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { assertThat(request.getEntries()).size().isOne(); assertThat(request.isSaveLatest()).isTrue(); }));