diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index de3864df9f..442ff1d11f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -31,6 +31,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; @@ -544,6 +545,11 @@ public class ActorSystemContext { @Getter private CalculatedFieldStateService calculatedFieldStateService; + @Lazy + @Autowired(required = false) + @Getter + private CalculatedFieldQueueService calculatedFieldQueueService; + @Lazy @Autowired(required = false) @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index d162b6a09a..a5b43d3223 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -23,6 +23,7 @@ import org.bouncycastle.util.Arrays; import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.RuleEngineAlarmService; @@ -902,6 +903,11 @@ public class DefaultTbContext implements TbContext { return mainCtx.getCalculatedFieldService(); } + @Override + public CalculatedFieldQueueService getCalculatedFieldQueueService() { + return mainCtx.getCalculatedFieldQueueService(); + } + @Override public boolean isExternalNodeForceAck() { return mainCtx.isExternalNodeForceAck(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 3ca0b220ff..c1ffb10e79 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; @@ -47,6 +48,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -89,6 +91,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), + () -> toCalculatedFieldTelemetryMsgProto(request), callback); + } + @Override public void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); @@ -97,6 +107,14 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), + () -> toCalculatedFieldTelemetryMsgProto(request), callback); + } + @Override public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); @@ -152,22 +170,34 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS }; } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request) { + return toCalculatedFieldTelemetryMsgProto(request, null); + } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); - List entries = request.getEntries(); - List versions = result.getVersions(); - for (int i = 0; i < entries.size(); i++) { - long tsVersion = versions.get(i); - TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build(); - telemetryMsg.addTsData(tsProto); - } - msg.setTelemetryMsg(telemetryMsg.build()); + List entries = request.getEntries(); + List versions = result != null ? result.getVersions() : Collections.emptyList(); + + for (int i = 0; i < entries.size(); i++) { + TsKvProto.Builder tsProtoBuilder = toTsKvProto(entries.get(i)).toBuilder(); + if (result != null) { + tsProtoBuilder.setVersion(versions.get(i)); + } + telemetryMsg.addTsData(tsProtoBuilder.build()); + } + + msg.setTelemetryMsg(telemetryMsg.build()); return msg.build(); } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request) { + return toCalculatedFieldTelemetryMsgProto(request, null); + } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List versions) { ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); @@ -175,9 +205,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name())); List entries = request.getEntries(); for (int i = 0; i < entries.size(); i++) { - long attrVersion = versions.get(i); - AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build(); - telemetryMsg.addAttrData(attrProto); + AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); + if (versions != null) { + attrProtoBuilder.setVersion(versions.get(i)); + } + telemetryMsg.addAttrData(attrProtoBuilder.build()); } msg.setTelemetryMsg(telemetryMsg.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index a237f3d022..bfe9eed24f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -42,13 +42,17 @@ public class SingleValueArgumentEntry implements ArgumentEntry { public SingleValueArgumentEntry(TsKvProto entry) { this.ts = entry.getTs(); - this.version = entry.getVersion(); + if (entry.hasVersion()) { + this.version = entry.getVersion(); + } this.kvEntryValue = ProtoUtils.fromProto(entry.getKv()); } public SingleValueArgumentEntry(AttributeValueProto entry) { this.ts = entry.getLastUpdateTs(); - this.version = entry.getVersion(); + if (entry.hasVersion()) { + this.version = entry.getVersion(); + } this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 2bdbd917ab..31ae9e8055 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,6 +31,7 @@ import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -50,7 +51,6 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index b8bfa13e50..b47bea18d8 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.rule.engine.api.CalculatedFieldQueueService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; @@ -56,7 +57,6 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java similarity index 83% rename from application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java rename to rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java index b688993a8d..b4eb24baea 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/CalculatedFieldQueueService.java @@ -13,13 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.cf; +package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; -import org.thingsboard.rule.engine.api.AttributesDeleteRequest; -import org.thingsboard.rule.engine.api.AttributesSaveRequest; -import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; -import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import java.util.List; @@ -34,8 +30,12 @@ public interface CalculatedFieldQueueService { */ void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback); + void pushRequestToQueue(TimeseriesSaveRequest request, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesSaveRequest request, FutureCallback callback); + void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 6ba4eae0e4..1abad50a8d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -360,6 +360,8 @@ public interface TbContext { CalculatedFieldService getCalculatedFieldService(); + CalculatedFieldQueueService getCalculatedFieldQueueService(); + boolean isExternalNodeForceAck(); /** diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java new file mode 100644 index 0000000000..ac0e61d9be --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -0,0 +1,212 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.FutureCallback; +import com.google.gson.JsonParser; +import jakarta.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; +import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.thingsboard.server.common.data.DataConstants.SCOPE; +import static org.thingsboard.server.common.data.msg.TbMsgType.ATTRIBUTES_DELETED; +import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; +import static org.thingsboard.server.common.data.msg.TbMsgType.TIMESERIES_DELETED; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "push to calculated fields", + configClazz = EmptyNodeConfiguration.class, + nodeDescription = "Pushes messages to the calculated fields for further processing", + nodeDetails = "Forwards incoming messages to the calculated fields, where they will be processed to compute values based on predefined calculation rules without persisting any data in the database.", + configDirective = "tbNodeEmptyConfig", + icon = "send" +) +public class TbCalculatedFieldsNode implements TbNode { + + private EmptyNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, EmptyNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (!msg.isTypeOneOf(POST_ATTRIBUTES_REQUEST, POST_TELEMETRY_REQUEST, ATTRIBUTES_DELETED, TIMESERIES_DELETED)) { + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); + return; + } + + String src = msg.getData(); + + if (msg.isTypeOf(POST_TELEMETRY_REQUEST)) { + processPostTelemetryRequest(ctx, msg, src); + } else if (msg.isTypeOf(POST_ATTRIBUTES_REQUEST)) { + processPostAttributesRequest(ctx, msg, src); + } else if (msg.isTypeOf(TIMESERIES_DELETED)) { + processTimeSeriesDeleted(ctx, msg, src); + } else { + processAttributesDeleted(ctx, msg, src); + } + + } + + private void processPostTelemetryRequest(TbContext ctx, TbMsg msg, String src) { + Map> tsKvMap = JsonConverter.convertToTelemetry(JsonParser.parseString(src), System.currentTimeMillis()); + if (tsKvMap.isEmpty()) { + ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); + return; + } + + List tsKvEntryList = new ArrayList<>(); + for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { + for (KvEntry kvEntry : tsKvEntry.getValue()) { + tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + } + } + + TimeseriesSaveRequest timeseriesSaveRequest = TimeseriesSaveRequest.builder() + .tenantId(ctx.getTenantId()) + .customerId(msg.getCustomerId()) + .entityId(msg.getOriginator()) + .entries(tsKvEntryList) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + + ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); + } + + private void processPostAttributesRequest(TbContext ctx, TbMsg msg, String src) { + List newAttributes = new ArrayList<>(JsonConverter.convertToAttributes(JsonParser.parseString(src))); + + if (newAttributes.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + AttributesSaveRequest attributesSaveRequest = AttributesSaveRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) + .entries(newAttributes) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); + } + + private void processTimeSeriesDeleted(TbContext ctx, TbMsg msg, String src) { + JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); + List keysToDelete = JacksonUtil.convertValue(dataJson.get("timeSeries"), new TypeReference<>() { + }); + + if (keysToDelete.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + TimeseriesDeleteRequest timeseriesDeleteRequest = TimeseriesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .keys(keysToDelete) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new FutureCallback>() { + @Override + public void onSuccess(@Nullable List tmp) { + } + + @Override + public void onFailure(Throwable t) { + } + }) + .build(); + + ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesDeleteRequest, keysToDelete, getCalculatedFieldCallback(timeseriesDeleteRequest.getCallback(), keysToDelete)); + } + + private void processAttributesDeleted(TbContext ctx, TbMsg msg, String src) { + JsonNode dataJson = JacksonUtil.toJsonNode(msg.getData()); + List keysToDelete = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); + + if (keysToDelete.isEmpty()) { + ctx.tellSuccess(msg); + return; + } + + AttributesDeleteRequest attributesDeleteRequest = AttributesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) + .keys(keysToDelete) + .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) + .callback(new TelemetryNodeCallback(ctx, msg)) + .build(); + ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesDeleteRequest, keysToDelete, attributesDeleteRequest.getCallback()); + } + + private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { + return new FutureCallback() { + @Override + public void onSuccess(Void unused) { + originalCallback.onSuccess(keys); + } + + @Override + public void onFailure(Throwable t) { + originalCallback.onFailure(t); + } + }; + } + +}