diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 263b4849fa..5c8503650b 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -24,12 +24,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.rule.engine.api.util.DonAsynchron; +import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseTsKvQuery; @@ -42,6 +45,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvQuery; +import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -101,6 +105,10 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio @Lazy private DeviceStateService stateService; + @Autowired + @Lazy + private ActorService actorService; + private ExecutorService tsCallBackExecutor; private ExecutorService wsCallBackExecutor; @@ -203,6 +211,13 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio , System.currentTimeMillis())), callback); } + @Override + public void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set attributes) { + DeviceAttributesEventNotificationMsg notificationMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId, + deviceId, DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)); + actorService.onMsg(new SendToClusterMsg(deviceId, notificationMsg)); + } + @Override public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) { ClusterAPIProtos.SubscriptionProto proto; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index aa57a027b8..c70b9060ee 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -16,11 +16,14 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; +import java.util.Set; /** * Created by ashvayka on 02.04.18. @@ -41,4 +44,6 @@ public interface RuleEngineTelemetryService { void saveAttrAndNotify(EntityId entityId, String scope, String key, boolean value, FutureCallback callback); + void onSharedAttributesUpdate(TenantId tenantId, DeviceId deviceId, Set attributes); + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 4f82884f33..5cdb04ed5b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -17,12 +17,15 @@ package org.thingsboard.rule.engine.telemetry; import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -62,6 +65,9 @@ public class TbMsgAttributesNode implements TbNode { String src = msg.getData(); Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(src)).getAttributes(); ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), config.getScope(), new ArrayList<>(attributes), new TelemetryNodeCallback(ctx, msg)); + if (msg.getOriginator().getEntityType() == EntityType.DEVICE && DataConstants.SHARED_SCOPE.equals(config.getScope())) { + ctx.getTelemetryService().onSharedAttributesUpdate(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId()), attributes); + } } @Override