diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index 40e43cb6fa..e5699c895a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -350,6 +350,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor() { @Override public void onSuccess(@Nullable Void aVoid) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index aab403cd01..b6b3f41a33 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -25,20 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; -import org.thingsboard.server.actors.service.ActorService; -import org.thingsboard.server.common.data.Event; -import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.TimePageData; -import org.thingsboard.server.common.data.page.TimePageLink; -import org.thingsboard.server.dao.asset.AssetService; -import org.thingsboard.server.dao.attributes.AttributesService; -import org.thingsboard.server.dao.device.DeviceService; -import org.thingsboard.server.dao.edge.EdgeService; -import org.thingsboard.server.dao.entityview.EntityViewService; -import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.ResponseMsg; @@ -52,7 +41,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; @Service @Slf4j @@ -74,24 +62,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { @Autowired private EdgeContextComponent ctx; - @Autowired - private EdgeService edgeService; - - @Autowired - private AssetService assetService; - - @Autowired - private DeviceService deviceService; - - @Autowired - private EntityViewService entityViewService; - - @Autowired - private AttributesService attributesService; - - @Autowired - private ActorService actorService; - private Server server; private ExecutorService executor; @@ -156,4 +126,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase { private void onEdgeDisconnect(EdgeId edgeId) { sessions.remove(edgeId); } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 0220c32e56..b9dbe393e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Event; +import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeQueueEntry; @@ -59,6 +60,7 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.util.mapping.JacksonUtil; +import org.thingsboard.server.gen.edge.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.AssetUpdateMsg; import org.thingsboard.server.gen.edge.ConnectRequestMsg; import org.thingsboard.server.gen.edge.ConnectResponseCode; @@ -68,6 +70,7 @@ import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EntityDataProto; +import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.NodeConnectionInfoProto; import org.thingsboard.server.gen.edge.RequestMsg; @@ -140,11 +143,6 @@ public final class EdgeGrpcSession implements Cloneable { .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg())) .build()); } - if (requestMsg.getMsgType().equals(RequestMsgType.DEVICE_UPDATE_RPC_MESSAGE) && requestMsg.hasDeviceUpdateMsg()) { - outputStream.onNext(ResponseMsg.newBuilder() - .setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg())) - .build()); - } } } @@ -161,6 +159,7 @@ public final class EdgeGrpcSession implements Cloneable { }; } + void processHandleMessages() throws ExecutionException, InterruptedException { Long queueStartTs = getQueueStartTs().get(); // TODO: this 100 value must be changed properly @@ -170,7 +169,6 @@ public final class EdgeGrpcSession implements Cloneable { do { pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink); if (!pageData.getData().isEmpty()) { - edge = ctx.getEdgeService().findEdgeById(edge.getTenantId(), edge.getId()); for (Event event : pageData.getData()) { EdgeQueueEntry entry; try { @@ -181,6 +179,8 @@ public final class EdgeGrpcSession implements Cloneable { case ENTITY_DELETED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE: case ENTITY_CREATED_RPC_MESSAGE: + case ALARM_ACK_RPC_MESSAGE: + case ALARM_CLEARK_RPC_MESSAGE: processEntityCRUDMessage(entry, msgType); break; case RULE_CHAIN_CUSTOM_MESSAGE: @@ -239,6 +239,10 @@ public final class EdgeGrpcSession implements Cloneable { private void processEntityCRUDMessage(EdgeQueueEntry entry, UpdateMsgType msgType) throws java.io.IOException { log.trace("Executing processEntityCRUDMessage, entry [{}], msgType [{}]", entry, msgType); switch (entry.getEntityType()) { + case EDGE: + Edge edge = objectMapper.readValue(entry.getData(), Edge.class); + onEdgeUpdated(msgType, edge); + break; case DEVICE: Device device = objectMapper.readValue(entry.getData(), Device.class); onDeviceUpdated(msgType, device); @@ -263,6 +267,10 @@ public final class EdgeGrpcSession implements Cloneable { RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class); onRuleChainMetadataUpdated(msgType, ruleChainMetaData); break; + case ALARM: + Alarm alarm = objectMapper.readValue(entry.getData(), Alarm.class); + onAlarmUpdated(msgType, alarm); + break; } } @@ -284,45 +292,107 @@ public final class EdgeGrpcSession implements Cloneable { } ); } + private void onEdgeUpdated(UpdateMsgType msgType, Edge edge) { + // TODO: voba add configuration update to edge + this.edge = edge; + } + private void onDeviceUpdated(UpdateMsgType msgType, Device device) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setDeviceUpdateMsg(constructDeviceUpdatedMsg(msgType, device)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } private void onAssetUpdated(UpdateMsgType msgType, Asset asset) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setAssetUpdateMsg(constructAssetUpdatedMsg(msgType, asset)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } private void onEntityViewUpdated(UpdateMsgType msgType, EntityView entityView) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setEntityViewUpdateMsg(constructEntityViewUpdatedMsg(msgType, entityView)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setRuleChainUpdateMsg(constructRuleChainUpdatedMsg(msgType, ruleChain)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) { RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); if (ruleChainMetadataUpdateMsg != null) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } } private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) { - outputStream.onNext(ResponseMsg.newBuilder() + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() .setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) .build()); } + private void onAlarmUpdated(UpdateMsgType msgType, Alarm alarm) { + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() + .setAlarmUpdateMsg(constructAlarmUpdatedMsg(msgType, alarm)) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) + .build()); + } + + private AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm) { + String entityName = null; + switch (alarm.getOriginator().getEntityType()) { + case DEVICE: + entityName = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(alarm.getOriginator().getId())).getName(); + break; + case ASSET: + entityName = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(alarm.getOriginator().getId())).getName(); + break; + case ENTITY_VIEW: + entityName = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(alarm.getOriginator().getId())).getName(); + break; + } + AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder() + .setMsgType(msgType) + .setName(alarm.getName()) + .setType(alarm.getName()) + .setOriginatorName(entityName) + .setOriginatorType(alarm.getOriginator().getEntityType().name()) + .setSeverity(alarm.getSeverity().name()) + .setStatus(alarm.getStatus().name()) + .setStartTs(alarm.getStartTs()) + .setEndTs(alarm.getEndTs()) + .setAckTs(alarm.getAckTs()) + .setClearTs(alarm.getClearTs()) + .setDetails(JacksonUtil.toString(alarm.getDetails())) + .setPropagate(alarm.isPropagate()); + return builder.build(); + } + private UpdateMsgType getResponseMsgType(String msgType) { if (msgType.equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) || msgType.equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) || @@ -504,31 +574,31 @@ public final class EdgeGrpcSession implements Cloneable { if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { TbMsg tbMsg = null; - TbMsg tmp = TbMsg.fromBytes(entityData.getTbMsg().toByteArray()); - switch (tmp.getOriginator().getEntityType()) { + TbMsg originalTbMsg = TbMsg.fromBytes(entityData.getTbMsg().toByteArray()); + switch (originalTbMsg.getOriginator().getEntityType()) { case DEVICE: String deviceName = entityData.getEntityName(); String deviceType = entityData.getEntityType(); Device device = getOrCreateDevice(deviceName, deviceType); if (device != null) { - tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), device.getId(), tmp.getMetaData().copy(), - tmp.getDataType(), tmp.getData(), null, null, 0L); + tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), device.getId(), originalTbMsg.getMetaData().copy(), + originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L); } break; case ASSET: String assetName = entityData.getEntityName(); Asset asset = ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), assetName); if (asset != null) { - tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), asset.getId(), tmp.getMetaData().copy(), - tmp.getDataType(), tmp.getData(), null, null, 0L); + tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), asset.getId(), originalTbMsg.getMetaData().copy(), + originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L); } break; case ENTITY_VIEW: String entityViewName = entityData.getEntityName(); EntityView entityView = ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityViewName); if (entityView != null) { - tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), entityView.getId(), tmp.getMetaData().copy(), - tmp.getDataType(), tmp.getData(), null, null, 0L); + tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), entityView.getId(), originalTbMsg.getMetaData().copy(), + originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L); } break; } @@ -537,6 +607,17 @@ public final class EdgeGrpcSession implements Cloneable { } } } + if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) { + for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { + String deviceName = deviceUpdateMsg.getName(); + String deviceType = deviceUpdateMsg.getType(); + switch (deviceUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + getOrCreateDevice(deviceName, deviceType); + break; + } + } + } } catch (Exception e) { return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java index 1be0d7c01d..b0f228e39a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.common.data.edge; public enum EdgeQueueEntityType { - DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA + DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA, EDGE } diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index 75b3756ce6..d0fadbcaed 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -24,21 +24,16 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.edge.exception.EdgeConnectionException; -import org.thingsboard.server.gen.edge.AssetUpdateMsg; import org.thingsboard.server.gen.edge.ConnectRequestMsg; import org.thingsboard.server.gen.edge.ConnectResponseCode; import org.thingsboard.server.gen.edge.ConnectResponseMsg; -import org.thingsboard.server.gen.edge.DashboardUpdateMsg; -import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; -import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; +import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; -import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; -import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -72,12 +67,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { String edgeSecret, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onDeviceUpdate, - Consumer onAssetUpdate, - Consumer onEntityViewUpdate, - Consumer onRuleChainUpdate, - Consumer onRuleChainMetadataUpdate, - Consumer onDashboardUpdate, + Consumer onEntityUpdate, Consumer onDownlink, Consumer onError) { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext(); @@ -92,7 +82,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { channel = builder.build(); EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel); log.info("[{}] Sending a connect request to the TB!", edgeKey); - this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onRuleChainMetadataUpdate, onDashboardUpdate, onDownlink, onError)); + this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onEntityUpdate, onDownlink, onError)); this.inputStream.onNext(RequestMsg.newBuilder() .setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE) .setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build()) @@ -118,12 +108,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { private StreamObserver initOutputStream(String edgeKey, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onDeviceUpdate, - Consumer onAssetUpdate, - Consumer onEntityViewUpdate, - Consumer onRuleChainUpdate, - Consumer onRuleChainMetadataUpdate, - Consumer onDashboardUpdate, + Consumer onEntityUpdate, Consumer onDownlink, Consumer onError) { return new StreamObserver() { @@ -141,24 +126,9 @@ public class EdgeGrpcClient implements EdgeRpcClient { } else if (responseMsg.hasUplinkResponseMsg()) { log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg()); onUplinkResponse.accept(responseMsg.getUplinkResponseMsg()); - } else if (responseMsg.hasDeviceUpdateMsg()) { - log.debug("[{}] Device update message received {}", edgeKey, responseMsg.getDeviceUpdateMsg()); - onDeviceUpdate.accept(responseMsg.getDeviceUpdateMsg()); - } else if (responseMsg.hasAssetUpdateMsg()) { - log.debug("[{}] Asset update message received {}", edgeKey, responseMsg.getAssetUpdateMsg()); - onAssetUpdate.accept(responseMsg.getAssetUpdateMsg()); - } else if (responseMsg.hasEntityViewUpdateMsg()) { - log.debug("[{}] EntityView update message received {}", edgeKey, responseMsg.getEntityViewUpdateMsg()); - onEntityViewUpdate.accept(responseMsg.getEntityViewUpdateMsg()); - } else if (responseMsg.hasRuleChainUpdateMsg()) { - log.debug("[{}] Rule Chain udpate message received {}", edgeKey, responseMsg.getRuleChainUpdateMsg()); - onRuleChainUpdate.accept(responseMsg.getRuleChainUpdateMsg()); - } else if (responseMsg.hasRuleChainMetadataUpdateMsg()) { - log.debug("[{}] Rule Chain Metadata udpate message received {}", edgeKey, responseMsg.getRuleChainMetadataUpdateMsg()); - onRuleChainMetadataUpdate.accept(responseMsg.getRuleChainMetadataUpdateMsg()); - } else if (responseMsg.hasDashboardUpdateMsg()) { - log.debug("[{}] Dashboard message received {}", edgeKey, responseMsg.getDashboardUpdateMsg()); - onDashboardUpdate.accept(responseMsg.getDashboardUpdateMsg()); + } else if (responseMsg.hasEntityUpdateMsg()) { + log.debug("[{}] Entity update message received {}", edgeKey, responseMsg.getEntityUpdateMsg()); + onEntityUpdate.accept(responseMsg.getEntityUpdateMsg()); } else if (responseMsg.hasDownlinkMsg()) { log.debug("[{}] Downlink message received for rule chain {}", edgeKey, responseMsg.getDownlinkMsg()); onDownlink.accept(responseMsg.getDownlinkMsg()); diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java index 99ff2e4ddd..da586c79bc 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java @@ -15,14 +15,9 @@ */ package org.thingsboard.edge.rpc; -import org.thingsboard.server.gen.edge.AssetUpdateMsg; -import org.thingsboard.server.gen.edge.DashboardUpdateMsg; -import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; -import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; -import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; -import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; +import org.thingsboard.server.gen.edge.EntityUpdateMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -34,12 +29,7 @@ public interface EdgeRpcClient { String integrationSecret, Consumer onUplinkResponse, Consumer onEdgeUpdate, - Consumer onDeviceUpdate, - Consumer onAssetUpdate, - Consumer onEntityViewUpdate, - Consumer onRuleChainUpdate, - Consumer onRuleChainMetadataUpdate, - Consumer onDashboardUpdate, + Consumer onEntityUpdate, Consumer onDownlink, Consumer onError); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 2ed8df52da..74a3321cd9 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -34,26 +34,29 @@ service EdgeRpcService { message RequestMsg { RequestMsgType msgType = 1; ConnectRequestMsg connectRequestMsg = 2; - DeviceUpdateMsg deviceUpdateMsg = 3; - UplinkMsg uplinkMsg = 4; + UplinkMsg uplinkMsg = 3; } message ResponseMsg { ConnectResponseMsg connectResponseMsg = 1; UplinkResponseMsg uplinkResponseMsg = 2; - DeviceUpdateMsg deviceUpdateMsg = 3; - RuleChainUpdateMsg ruleChainUpdateMsg = 4; - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 5; - DashboardUpdateMsg dashboardUpdateMsg = 6; - AssetUpdateMsg assetUpdateMsg = 7; - EntityViewUpdateMsg entityViewUpdateMsg = 8; - DownlinkMsg downlinkMsg = 9; + EntityUpdateMsg entityUpdateMsg = 3; + DownlinkMsg downlinkMsg = 4; +} + +message EntityUpdateMsg { + DeviceUpdateMsg deviceUpdateMsg = 1; + RuleChainUpdateMsg ruleChainUpdateMsg = 2; + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 3; + DashboardUpdateMsg dashboardUpdateMsg = 4; + AssetUpdateMsg assetUpdateMsg = 5; + EntityViewUpdateMsg entityViewUpdateMsg = 6; + AlarmUpdateMsg alarmUpdateMsg = 7; } enum RequestMsgType { CONNECT_RPC_MESSAGE = 0; UPLINK_RPC_MESSAGE = 1; - DEVICE_UPDATE_RPC_MESSAGE = 2; } message ConnectRequestMsg { @@ -85,7 +88,9 @@ enum UpdateMsgType { ENTITY_CREATED_RPC_MESSAGE = 0; ENTITY_UPDATED_RPC_MESSAGE = 1; ENTITY_DELETED_RPC_MESSAGE = 2; - RULE_CHAIN_CUSTOM_MESSAGE = 3; + ALARM_ACK_RPC_MESSAGE = 3; + ALARM_CLEARK_RPC_MESSAGE = 4; + RULE_CHAIN_CUSTOM_MESSAGE = 5; } message EntityDataProto { @@ -166,6 +171,22 @@ message EntityViewUpdateMsg { EntityType relatedEntityType = 6; } +message AlarmUpdateMsg { + UpdateMsgType msgType = 1; + string name = 2; + string type = 3; + string originatorType = 4; + string originatorName = 5; + string severity = 6; + string status = 7; + int64 startTs = 8; + int64 endTs = 9; + int64 ackTs = 10; + int64 clearTs = 11; + string details = 12; + bool propagate = 13; +} + enum EntityType { DEVICE = 0; ASSET = 1; @@ -178,6 +199,8 @@ enum EntityType { message UplinkMsg { int32 uplinkMsgId = 1; repeated EntityDataProto entityData = 2; + repeated DeviceUpdateMsg deviceUpdateMsg = 3; + repeated AlarmUpdateMsg alarmUpdatemsg = 4; } message UplinkResponseMsg { diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java index 3e87324105..29eec1a6d9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeService.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.ShortEdgeInfo; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeQueueEntityType; @@ -151,7 +152,6 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } } - @Override public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) { log.trace("Executing findEdgeById [{}]", edgeId); @@ -349,6 +349,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } else { try { switch (tbMsg.getOriginator().getEntityType()) { + case EDGE: + processEdge(tenantId, tbMsg, callback); + break; case ASSET: processAsset(tenantId, tbMsg, callback); break; @@ -364,6 +367,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic case ENTITY_VIEW: processEntityView(tenantId, tbMsg, callback); break; + case ALARM: + processAlarm(tenantId, tbMsg, callback); + break; default: log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType()); } @@ -374,26 +380,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) { - EdgeId edgeId = null; - EdgeQueueEntityType edgeQueueEntityType = null; - switch (tbMsg.getOriginator().getEntityType()) { - case DEVICE: - edgeQueueEntityType = EdgeQueueEntityType.DEVICE; - Device device = deviceService.findDeviceById(tenantId, new DeviceId(tbMsg.getOriginator().getId())); - edgeId = device.getEdgeId(); - break; - case ASSET: - edgeQueueEntityType = EdgeQueueEntityType.ASSET; - Asset asset = assetService.findAssetById(tenantId, new AssetId(tbMsg.getOriginator().getId())); - edgeId = asset.getEdgeId(); - break; - case ENTITY_VIEW: - edgeQueueEntityType = EdgeQueueEntityType.ENTITY_VIEW; - EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(tbMsg.getOriginator().getId())); - edgeId = entityView.getEdgeId(); - break; - } - if (edgeId != null) { + EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator()); + EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType()); + if (edgeId != null && edgeQueueEntityType != null) { try { saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback); } catch (IOException e) { @@ -402,6 +391,37 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } } + private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) { + switch (entityType) { + case DEVICE: + return EdgeQueueEntityType.DEVICE; + case ASSET: + return EdgeQueueEntityType.ASSET; + case ENTITY_VIEW: + return EdgeQueueEntityType.ENTITY_VIEW; + default: + log.info("Unsupported entity type: [{}]", entityType); + return null; + } + } + + private EdgeId getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) { + switch (originatorId.getEntityType()) { + case DEVICE: + Device device = deviceService.findDeviceById(tenantId, new DeviceId(originatorId.getId())); + return device.getEdgeId(); + case ASSET: + Asset asset = assetService.findAssetById(tenantId, new AssetId(originatorId.getId())); + return asset.getEdgeId(); + case ENTITY_VIEW: + EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(originatorId.getId())); + return entityView.getEdgeId(); + default: + log.info("Unsupported entity type: [{}]", originatorId.getEntityType()); + return null; + } + } + private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { switch (tbMsg.getType()) { case DataConstants.ENTITY_ASSIGNED_TO_EDGE: @@ -421,6 +441,21 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } } + private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Edge edge = mapper.readValue(tbMsg.getData(), Edge.class); + if (edge != null) { + pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.EDGE, tbMsg, callback); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { switch (tbMsg.getType()) { case DataConstants.ENTITY_ASSIGNED_TO_EDGE: @@ -459,6 +494,25 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic } } + private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + case DataConstants.ALARM_ACK: + case DataConstants.ALARM_CLEAR: + Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class); + EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, alarm.getOriginator()); + EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); + if (edgeId != null && edgeQueueEntityType != null) { + pushEventToEdge(tenantId, edgeId, EdgeQueueEntityType.ALARM, tbMsg, callback); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback); } @@ -562,8 +616,8 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic @Override public Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { edge.setRootRuleChainId(ruleChainId); - Edge saveEdge = saveEdge(edge); - ruleChainService.updateEdgeRuleChains(tenantId, saveEdge.getId()); + Edge savedEdge = saveEdge(edge); + ruleChainService.updateEdgeRuleChains(tenantId, savedEdge.getId()); RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId); saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback() { @Override @@ -576,7 +630,7 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic log.debug("Failure during event save", t); } }); - return saveEdge; + return savedEdge; } private DataValidator edgeValidator =