From 0fb9f21e7867ac9991d7c4bf18d6b8397a944a40 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Mon, 2 Apr 2018 21:09:34 +0300 Subject: [PATCH] Telmetry plugin removed --- .../actors/ruleChain/DefaultTbContext.java | 15 +++- .../controller/TelemetryController.java | 28 +++--- .../controller/plugin/TbWebSocketHandler.java | 4 +- .../TelemetrySubscriptionService.java | 16 +--- .../api/RuleEngineTelemetryService.java | 21 +++++ .../rule/engine/api/TbContext.java | 2 + rule-engine/rule-engine-components/pom.xml | 9 ++ .../rule/engine/debug/TbMsgGeneratorNode.java | 34 +++++--- .../TbMsgGeneratorNodeConfiguration.java | 4 + .../engine/telemetry/TbMsgTelemetryNode.java | 87 +++++++++++++++++++ .../TbMsgTelemetryNodeConfiguration.java | 34 ++++++++ .../telemetry/TelemetryNodeCallback.java | 27 ++++++ 12 files changed, 238 insertions(+), 43 deletions(-) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java create mode 100644 rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 7dca2c8409..ae60308833 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,8 +16,10 @@ package org.thingsboard.server.actors.ruleChain; import akka.actor.ActorRef; +import akka.actor.Cancellable; import com.google.common.base.Function; import org.thingsboard.rule.engine.api.ListeningExecutor; +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.RuleNodeId; @@ -164,6 +166,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getTsService(); } + @Override + public RuleEngineTelemetryService getTelemetryService() { + return mainCtx.getTsSubService(); + } + @Override public RelationService getRelationService() { return mainCtx.getRelationService(); diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index c1338e4c58..25f09cae65 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -115,7 +115,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes", method = RequestMethod.GET) @ResponseBody public DeferredResult getAttributeKeys( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { @@ -123,7 +123,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes/{scope}", method = RequestMethod.GET) @ResponseBody public DeferredResult getAttributeKeysByScope( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr @@ -133,7 +133,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/values/attributes", method = RequestMethod.GET) @ResponseBody public DeferredResult getAttributes( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @@ -144,7 +144,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/values/attributes/{scope}", method = RequestMethod.GET) @ResponseBody public DeferredResult getAttributesByScope( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @@ -156,7 +156,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/keys/timeseries", method = RequestMethod.GET) @ResponseBody public DeferredResult getTimeseriesKeys( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { @@ -167,7 +167,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET) @ResponseBody public DeferredResult getLatestTimeseries( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @@ -180,7 +180,7 @@ public class TelemetryController extends BaseController { @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) + @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"}) @ResponseBody public DeferredResult getTimeseries( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @@ -222,7 +222,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST) + @RequestMapping(value = "/{entityType}/{entityId}/attributes/{scope}", method = RequestMethod.POST) @ResponseBody public DeferredResult saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @PathVariable("scope") String scope, @@ -232,7 +232,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST) + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}", method = RequestMethod.POST) @ResponseBody public DeferredResult saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @PathVariable("scope") String scope, @@ -242,7 +242,7 @@ public class TelemetryController extends BaseController { } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") - @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST) + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}/{ttl}", method = RequestMethod.POST) @ResponseBody public DeferredResult saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl, diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index ba5f9f96fc..42d6a20d56 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -155,7 +155,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements PluginWe if (internalId != null) { SessionMetaData sessionMd = internalSessionMap.get(internalId); if (sessionMd != null) { - sessionMd.session.sendMessage(new TextMessage(msg)); + synchronized (sessionMd) { + sessionMd.session.sendMessage(new TextMessage(msg)); + } } else { log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java index 7bf223f83f..923d06bc7f 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java @@ -15,21 +15,14 @@ */ package org.thingsboard.server.service.telemetry; -import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Created by ashvayka on 27.03.18. */ -public interface TelemetrySubscriptionService { +public interface TelemetrySubscriptionService extends RuleEngineTelemetryService { void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub); @@ -37,9 +30,4 @@ public interface TelemetrySubscriptionService { void removeSubscription(String sessionId, int cmdId); - void saveAndNotify(EntityId entityId, List ts, FutureCallback callback); - - void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback); - - void saveAndNotify(EntityId entityId, String scope, List attributes, FutureCallback callback); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java new file mode 100644 index 0000000000..253a2aff24 --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -0,0 +1,21 @@ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.List; + +/** + * Created by ashvayka on 02.04.18. + */ +public interface RuleEngineTelemetryService { + + void saveAndNotify(EntityId entityId, List ts, FutureCallback callback); + + void saveAndNotify(EntityId entityId, List ts, long ttl, FutureCallback callback); + + void saveAndNotify(EntityId entityId, String scope, List attributes, FutureCallback callback); + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 44bd3f56c9..c4514a8bc1 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -81,6 +81,8 @@ public interface TbContext { RuleChainService getRuleChainService(); + RuleEngineTelemetryService getTelemetryService(); + TimeseriesService getTimeseriesService(); RelationService getRelationService(); diff --git a/rule-engine/rule-engine-components/pom.xml b/rule-engine/rule-engine-components/pom.xml index a97493bdd7..3deea9b180 100644 --- a/rule-engine/rule-engine-components/pom.xml +++ b/rule-engine/rule-engine-components/pom.xml @@ -43,6 +43,11 @@ dao provided + + org.thingsboard.common + transport + provided + ch.qos.logback logback-core @@ -88,6 +93,10 @@ mockito-all test + + org.thingsboard.common + transport + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java index 43851a3c86..529420387e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNode.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.debug; import com.datastax.driver.core.utils.UUIDs; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.TbNodeUtils; import org.thingsboard.rule.engine.api.ListeningExecutor; import org.thingsboard.rule.engine.api.RuleNode; @@ -26,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.filter.TbJsFilterNodeConfiguration; import org.thingsboard.rule.engine.js.NashornJsEngine; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -33,6 +36,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; import javax.script.Bindings; import java.nio.charset.StandardCharsets; +import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.thingsboard.rule.engine.DonAsynchron.withCallback; @@ -53,30 +57,40 @@ public class TbMsgGeneratorNode implements TbNode { private TbMsgGeneratorNodeConfiguration config; private long delay; + private EntityId originatorId; + private UUID nextTickId; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); - ctx.tellSelf(newTickMsg(ctx), delay); + if (!StringUtils.isEmpty(config.getOriginatorId())) { + originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); + } else { + originatorId = ctx.getSelfId(); + } + sentTickMsg(ctx); } @Override public void onMsg(TbContext ctx, TbMsg msg) { - if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG)) { + if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { TbMsgMetaData metaData = new TbMsgMetaData(); if (config.getMsgMetaData() != null) { config.getMsgMetaData().forEach(metaData::putValue); } - ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), ctx.getSelfId(), metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8))); - ctx.tellSelf(newTickMsg(ctx), delay); + ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), originatorId, metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8))); + sentTickMsg(ctx); } } - private TbMsg newTickMsg(TbContext ctx) { - return new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{}); + private void sentTickMsg(TbContext ctx) { + TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{}); + nextTickId = tickMsg.getId(); + ctx.tellSelf(tickMsg, delay); } + @Override public void destroy() { } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java index e15b13f1ca..8eb37a2aef 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeConfiguration.java @@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.debug; import lombok.Data; import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.server.common.data.EntityType; + import java.util.Map; @Data @@ -24,6 +26,8 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration msgMetaData; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java new file mode 100644 index 0000000000..9314921fa1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNode.java @@ -0,0 +1,87 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; +import org.thingsboard.rule.engine.TbNodeUtils; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Slf4j +@RuleNode( + type = ComponentType.ACTION, + name = "save timeseries data", + configClazz = TbMsgTelemetryNodeConfiguration.class, + nodeDescription = "Saves timeseries data", + nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY' message type" +) + +public class TbMsgTelemetryNode implements TbNode { + + private TbMsgTelemetryNodeConfiguration config; + + @Override + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { + this.config = TbNodeUtils.convert(configuration, TbMsgTelemetryNodeConfiguration.class); + } + + @Override + public void onMsg(TbContext ctx, TbMsg msg) { + if (!msg.getType().equals("POST_TELEMETRY")) { + ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); + return; + } + + String src = new String(msg.getData(), StandardCharsets.UTF_8); + TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src)); + Map> tsKvMap = telemetryUploadRequest.getData(); + if (tsKvMap == null) { + ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src)); + return; + } + List tsKvEntryList = new ArrayList<>(); + for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { + for (KvEntry kvEntry : tsKvEntry.getValue()) { + tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + } + } + String ttlValue = msg.getMetaData().getValue("TTL"); + long ttl = !StringUtils.isEmpty(ttlValue) ? Long.valueOf(ttlValue) : config.getDefaultTTL(); + ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + } + + @Override + public void destroy() { + } + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java new file mode 100644 index 0000000000..8d6b7dcf6e --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTelemetryNodeConfiguration.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.telemetry; + +import lombok.Data; +import org.thingsboard.rule.engine.api.NodeConfiguration; + +import java.util.Map; + +@Data +public class TbMsgTelemetryNodeConfiguration implements NodeConfiguration { + + private long defaultTTL; + + @Override + public TbMsgTelemetryNodeConfiguration defaultConfiguration() { + TbMsgTelemetryNodeConfiguration configuration = new TbMsgTelemetryNodeConfiguration(); + configuration.setDefaultTTL(0L); + return configuration; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java new file mode 100644 index 0000000000..a7893263c1 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TelemetryNodeCallback.java @@ -0,0 +1,27 @@ +package org.thingsboard.rule.engine.telemetry; + +import com.google.common.util.concurrent.FutureCallback; +import lombok.Data; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.server.common.msg.TbMsg; + +import javax.annotation.Nullable; + +/** + * Created by ashvayka on 02.04.18. + */ +@Data +class TelemetryNodeCallback implements FutureCallback { + private final TbContext ctx; + private final TbMsg msg; + + @Override + public void onSuccess(@Nullable Void result) { + ctx.tellNext(msg); + } + + @Override + public void onFailure(Throwable t) { + ctx.tellError(msg, t); + } +}