Refactor saveAndNotify with returned future

This commit is contained in:
ViacheslavKlimov 2024-12-17 16:11:34 +02:00
parent e8cf3179c5
commit 3db304e021
6 changed files with 64 additions and 58 deletions

View File

@ -19,7 +19,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import jakarta.annotation.Nullable;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@ -127,14 +126,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
@Override
public ListenableFuture<Void> saveAndNotify(TimeseriesSaveRequest request) {
SettableFuture<Void> future = SettableFuture.create();
request.setCallback(new VoidFutureCallback(future));
save(request);
return future;
}
@Override
public ListenableFuture<Integer> saveInternal(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
@ -242,14 +233,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
}
@Override
public ListenableFuture<Void> saveAttrAndNotify(AttributesSaveRequest request) {
SettableFuture<Void> future = SettableFuture.create();
request.setCallback(new VoidFutureCallback(future));
save(request);
return future;
}
private void addEntityViewCallback(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
@ -397,23 +380,4 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
};
}
private static class VoidFutureCallback implements FutureCallback<Void> {
private final SettableFuture<Void> future;
public VoidFutureCallback(SettableFuture<Void> future) {
this.future = future;
}
@Override
public void onSuccess(Void result) {
future.set(null);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
}
}

View File

@ -16,10 +16,10 @@
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
@ -38,10 +38,9 @@ public class AttributesSaveRequest {
private final TenantId tenantId;
private final EntityId entityId;
private final AttributeScope scope;
private final List<AttributeKvEntry> entries; // todo: rename to attributes? same with timeseries
private final List<AttributeKvEntry> entries;
private final boolean notifyDevice;
@Setter
private FutureCallback<Void> callback;
private final FutureCallback<Void> callback;
public static Builder builder() {
return new Builder();
@ -106,6 +105,20 @@ public class AttributesSaveRequest {
return this;
}
public Builder future(SettableFuture<Void> future) {
return callback(new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
future.set(result);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
}
public AttributesSaveRequest build() {
return new AttributesSaveRequest(tenantId, entityId, scope, entries, notifyDevice, callback);
}

View File

@ -16,7 +16,6 @@
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -33,14 +32,10 @@ public interface RuleEngineTelemetryService {
void save(TimeseriesSaveRequest request);
ListenableFuture<Void> saveAndNotify(TimeseriesSaveRequest request);
void save(AttributesSaveRequest request);
void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
ListenableFuture<Void> saveAttrAndNotify(AttributesSaveRequest request);
void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, FutureCallback<Void> callback);
void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);

View File

@ -16,10 +16,10 @@
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -39,8 +39,7 @@ public class TimeseriesSaveRequest {
private final List<TsKvEntry> entries;
private final long ttl;
private final boolean saveLatest;
@Setter
private FutureCallback<Void> callback;
private final FutureCallback<Void> callback;
public static Builder builder() {
return new Builder();
@ -101,6 +100,20 @@ public class TimeseriesSaveRequest {
return this;
}
public Builder future(SettableFuture<Void> future) {
return callback(new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
future.set(result);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
}
public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, callback);
}

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
@ -143,11 +144,14 @@ public class TbMathNode implements TbNode {
private ListenableFuture<Void> saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
final BasicTsKvEntry basicTsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result));
return ctx.getTelemetryService().saveAndNotify(TimeseriesSaveRequest.builder()
SettableFuture<Void> future = SettableFuture.create();
ctx.getTelemetryService().save(TimeseriesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.entry(basicTsKvEntry)
.future(future)
.build());
return future;
}
private ListenableFuture<Void> saveAttribute(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
@ -160,12 +164,15 @@ public class TbMathNode implements TbNode {
var value = toDoubleValue(mathResultDef, result);
kvEntry = new DoubleDataEntry(mathResultDef.getKey(), value);
}
return ctx.getTelemetryService().saveAttrAndNotify(AttributesSaveRequest.builder()
SettableFuture<Void> future = SettableFuture.create();
ctx.getTelemetryService().save(AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(attributeScope)
.entry(kvEntry)
.future(future)
.build());
return future;
}
private boolean isIntegerResult(TbMathResult mathResultDef, TbRuleNodeMathFunctionType function) {

View File

@ -30,14 +30,17 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.ThrowingConsumer;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.Timeout;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
@ -76,6 +79,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -433,15 +437,17 @@ public class TbMathNodeTest {
);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
when(telemetryService.saveAttrAndNotify(any()))
.thenReturn(Futures.immediateFuture(null));
doAnswer(invocation -> {
AttributesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(AttributesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveAttrAndNotify(assertArg(request -> {
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<AttributesSaveRequest>) request -> {
assertThat(request.getEntries()).singleElement().extracting(KvEntry::getValue).isInstanceOf(Double.class);
}));
@ -461,13 +467,17 @@ public class TbMathNodeTest {
);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
when(telemetryService.saveAndNotify(any())).thenReturn(Futures.immediateFuture(null));
doAnswer(invocation -> {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(TimeseriesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveAndNotify(assertArg(request -> {
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
}));
@ -488,13 +498,17 @@ public class TbMathNodeTest {
);
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, originator, TbMsgMetaData.EMPTY, JacksonUtil.newObjectNode().put("a", 5).toString());
when(telemetryService.saveAndNotify(any())).thenReturn(Futures.immediateFuture(null));
doAnswer(invocation -> {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(TimeseriesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).saveAndNotify(assertArg(request -> {
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
}));