diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java new file mode 100644 index 0000000000..a65e95d0ff --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -0,0 +1,433 @@ +package org.thingsboard.server.service.edge; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.Dashboard; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.Event; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeQueueEntityType; +import org.thingsboard.server.common.data.edge.EdgeQueueEntry; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.TimePageData; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.common.data.rule.RuleChainType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.edge.EdgeService; +import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.event.EventService; +import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Service +@TbCoreComponent +@Slf4j +public class DefaultEdgeNotificationService implements EdgeNotificationService { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Autowired + private EdgeService edgeService; + + @Autowired + private DeviceService deviceService; + + @Autowired + private AssetService assetService; + + @Autowired + private EntityViewService entityViewService; + + @Autowired + private RuleChainService ruleChainService; + + @Autowired + private RelationService relationService; + + @Autowired + private EventService eventService; + + private ExecutorService tsCallBackExecutor; + + @PostConstruct + public void initExecutor() { + tsCallBackExecutor = Executors.newSingleThreadExecutor(); + } + + @PreDestroy + public void shutdownExecutor() { + if (tsCallBackExecutor != null) { + tsCallBackExecutor.shutdownNow(); + } + } + + @Override + public TimePageData findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { + return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink); + } + + @Override + public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { + edge.setRootRuleChainId(ruleChainId); + Edge savedEdge = edgeService.saveEdge(edge); + RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId); + saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback() { + @Override + public void onSuccess(@Nullable Void aVoid) { + log.debug("Event saved successfully!"); + } + + @Override + public void onFailure(Throwable t) { + log.debug("Failure during event save", t); + } + }); + return savedEdge; + } + + private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback callback) throws IOException { + log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data); + + EdgeQueueEntry queueEntry = new EdgeQueueEntry(); + queueEntry.setEntityType(entityType); + queueEntry.setType(type); + queueEntry.setData(data); + + Event event = new Event(); + event.setEntityId(edgeId); + event.setTenantId(tenantId); + event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE); + event.setBody(mapper.valueToTree(queueEntry)); + ListenableFuture saveFuture = eventService.saveAsync(event); + + addMainCallback(saveFuture, callback); + } + + private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Event result) { + callback.onSuccess(null); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }, tsCallBackExecutor); + } + + @Override + public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) { + if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) || + tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) || + tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) || + tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) { + processCustomTbMsg(tenantId, tbMsg, callback); + } else { + try { + switch (tbMsg.getOriginator().getEntityType()) { + case EDGE: + processEdge(tenantId, tbMsg, callback); + break; + case ASSET: + processAsset(tenantId, tbMsg, callback); + break; + case DEVICE: + processDevice(tenantId, tbMsg, callback); + break; + case DASHBOARD: + processDashboard(tenantId, tbMsg, callback); + break; + case RULE_CHAIN: + processRuleChain(tenantId, tbMsg, callback); + break; + case ENTITY_VIEW: + processEntityView(tenantId, tbMsg, callback); + break; + case ALARM: + processAlarm(tenantId, tbMsg, callback); + break; + default: + log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType()); + } + } catch (IOException e) { + log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e); + } + } + } + + + private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) { + ListenableFuture edgeIdFuture = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator()); + Futures.transform(edgeIdFuture, edgeId -> { + EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType()); + if (edgeId != null && edgeQueueEntityType != null) { + try { + saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback); + } catch (IOException e) { + log.error("Error while saving custom tbMsg into Edge Queue", e); + } + } + return null; + }, MoreExecutors.directExecutor()); + } + + private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Device device = mapper.readValue(tbMsg.getData(), Device.class); + pushEventToEdge(tenantId, device.getId(), EdgeQueueEntityType.DEVICE, tbMsg, callback); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + // TODO: voba - handle properly edge creation + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Asset asset = mapper.readValue(tbMsg.getData(), Asset.class); + pushEventToEdge(tenantId, asset.getId(), EdgeQueueEntityType.ASSET, tbMsg, callback); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class); + pushEventToEdge(tenantId, entityView.getId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + case DataConstants.ALARM_ACK: + case DataConstants.ALARM_CLEAR: + Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class); + EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); + if (edgeQueueEntityType != null) { + pushEventToEdge(tenantId, alarm.getOriginator(), EdgeQueueEntityType.ALARM, tbMsg, callback); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class); + ListenableFuture> future = edgeService.findEdgesByTenantIdAndDashboardId(tenantId, dashboard.getId(), new TimePageLink(Integer.MAX_VALUE)); + Futures.transform(future, edges -> { + if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) { + try { + for (Edge edge : edges.getData()) { + pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.DASHBOARD, tbMsg, callback); + } + } catch (IOException e) { + log.error("Can't push event to edge", e); + } + } + return null; + }, MoreExecutors.directExecutor()); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback); + break; + case DataConstants.ENTITY_DELETED: + case DataConstants.ENTITY_CREATED: + case DataConstants.ENTITY_UPDATED: + RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); + if (RuleChainType.EDGE.equals(ruleChain.getType())) { + ListenableFuture> future = edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, ruleChain.getId(), new TimePageLink(Integer.MAX_VALUE)); + Futures.transform(future, edges -> { + if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) { + try { + for (Edge edge : edges.getData()) { + pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback); + } + } catch (IOException e) { + log.error("Can't push event to edge", e); + } + } + return null; + }, MoreExecutors.directExecutor()); + } + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + + private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback callback) throws IOException { + EdgeId edgeId; + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId"))); + pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); + break; + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId"))); + pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); + break; + + } + } + + + private void pushEventToEdge(TenantId tenantId, EntityId originatorId, EdgeQueueEntityType edgeQueueEntityType, TbMsg tbMsg, FutureCallback callback) { + ListenableFuture edgeIdFuture = getEdgeIdByOriginatorId(tenantId, originatorId); + Futures.transform(edgeIdFuture, edgeId -> { + if (edgeId != null) { + try { + pushEventToEdge(tenantId, edgeId, edgeQueueEntityType, tbMsg, callback); + } catch (Exception e) { + log.error("Failed to push event to edge, edgeId [{}], tbMsg [{}]", edgeId, tbMsg, e); + } + } + return null; + }, + MoreExecutors.directExecutor()); + } + + + private ListenableFuture getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) { + List originatorEdgeRelations = relationService.findByToAndType(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) { + return Futures.immediateFuture(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); + } else { + return Futures.immediateFuture(null); + } + } + + + private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback callback) throws IOException { + log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg); + + saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback); + + if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) { + pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback); + } + } + + private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback callback) throws IOException { + RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); + switch (tbMsg.getType()) { + case DataConstants.ENTITY_ASSIGNED_TO_EDGE: + case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: + case DataConstants.ENTITY_UPDATED: + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId()); + saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback); + break; + default: + log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); + } + } + + + private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) { + switch (entityType) { + case DEVICE: + return EdgeQueueEntityType.DEVICE; + case ASSET: + return EdgeQueueEntityType.ASSET; + case ENTITY_VIEW: + return EdgeQueueEntityType.ENTITY_VIEW; + default: + log.info("Unsupported entity type: [{}]", entityType); + return null; + } + } +} + diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index a7a9ee1bed..26e094f9b3 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -53,6 +53,10 @@ public class EdgeContextComponent { @Autowired private EdgeService edgeService; + @Lazy + @Autowired + private EdgeNotificationService edgeNotificationService; + @Lazy @Autowired private AssetService assetService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java new file mode 100644 index 0000000000..f4de0f974e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeNotificationService.java @@ -0,0 +1,22 @@ +package org.thingsboard.server.service.edge; + +import org.thingsboard.server.common.data.Event; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.TimePageData; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.io.IOException; + +public interface EdgeNotificationService { + + TimePageData findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); + + Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException; + + void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback); +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f722e09de5..1e77f438c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -172,7 +172,7 @@ public final class EdgeGrpcSession implements Closeable { TimePageData pageData; UUID ifOffset = null; do { - pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink); + pageData = ctx.getEdgeNotificationService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink); if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); for (Event event : pageData.getData()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index b7f6bd3dd2..3c6e991d5e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; @@ -42,6 +43,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.EdgeNotificationService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; @@ -82,18 +84,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { @@ -275,6 +282,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> findEdgeTypesByTenantId(TenantId tenantId); - void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback callback); - - TimePageData findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); - - Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException; void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId); diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index d3e850b39c..ab0193318a 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -352,6 +352,12 @@ message FromDeviceRPCResponseProto { string response = 3; int32 error = 4; } + +message EdgeNotificationMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; +} + /** * Main messages; */ @@ -377,6 +383,7 @@ message ToCoreMsg { DeviceStateServiceMsgProto deviceStateServiceMsg = 2; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3; bytes toDeviceActorNotificationMsg = 4; + EdgeNotificationMsgProto edgeNotificationMsg = 5; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 17ff2003bf..48e8912225 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,14 +15,11 @@ */ package org.thingsboard.server.dao.edge; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.binary.Base64; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -31,19 +28,10 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Customer; -import org.thingsboard.server.common.data.Dashboard; -import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeQueueEntityType; -import org.thingsboard.server.common.data.edge.EdgeQueueEntry; import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; @@ -57,19 +45,10 @@ import org.thingsboard.server.common.data.page.TimePageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChain; -import org.thingsboard.server.common.data.rule.RuleChainMetaData; -import org.thingsboard.server.common.data.rule.RuleChainType; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerDao; import org.thingsboard.server.dao.dashboard.DashboardService; -import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.entity.AbstractEntityService; -import org.thingsboard.server.dao.entityview.EntityViewService; -import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; @@ -79,17 +58,11 @@ import org.thingsboard.server.dao.service.Validator; import org.thingsboard.server.dao.tenant.TenantDao; import javax.annotation.Nullable; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; @@ -104,8 +77,6 @@ import static org.thingsboard.server.dao.service.Validator.validateString; @Slf4j public class EdgeServiceImpl extends AbstractEntityService implements EdgeService { - private static final ObjectMapper mapper = new ObjectMapper(); - public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId "; @@ -123,41 +94,15 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic @Autowired private CacheManager cacheManager; - @Autowired - private EventService eventService; - @Autowired private DashboardService dashboardService; @Autowired private RuleChainService ruleChainService; - @Autowired - private DeviceService deviceService; - - @Autowired - private AssetService assetService; - - @Autowired - private EntityViewService entityViewService; - @Autowired private RelationService relationService; - private ExecutorService tsCallBackExecutor; - - @PostConstruct - public void initExecutor() { - tsCallBackExecutor = Executors.newSingleThreadExecutor(); - } - - @PreDestroy - public void shutdownExecutor() { - if (tsCallBackExecutor != null) { - tsCallBackExecutor.shutdownNow(); - } - } - @Override public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) { log.trace("Executing findEdgeById [{}]", edgeId); @@ -259,7 +204,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic return edgeDao.findEdgesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(edgeIds)); } - @Override public void deleteEdgesByTenantId(TenantId tenantId) { log.trace("Executing deleteEdgesByTenantId, tenantId [{}]", tenantId); @@ -344,327 +288,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic }, MoreExecutors.directExecutor()); } - @Override - public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) { - if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) || - tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) || - tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) || - tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) { - processCustomTbMsg(tenantId, tbMsg, callback); - } else { - try { - switch (tbMsg.getOriginator().getEntityType()) { - case EDGE: - processEdge(tenantId, tbMsg, callback); - break; - case ASSET: - processAsset(tenantId, tbMsg, callback); - break; - case DEVICE: - processDevice(tenantId, tbMsg, callback); - break; - case DASHBOARD: - processDashboard(tenantId, tbMsg, callback); - break; - case RULE_CHAIN: - processRuleChain(tenantId, tbMsg, callback); - break; - case ENTITY_VIEW: - processEntityView(tenantId, tbMsg, callback); - break; - case ALARM: - processAlarm(tenantId, tbMsg, callback); - break; - default: - log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType()); - } - } catch (IOException e) { - log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e); - } - } - } - - private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) { - ListenableFuture edgeIdFuture = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator()); - Futures.transform(edgeIdFuture, edgeId -> { - EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType()); - if (edgeId != null && edgeQueueEntityType != null) { - try { - saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback); - } catch (IOException e) { - log.error("Error while saving custom tbMsg into Edge Queue", e); - } - } - return null; - }, MoreExecutors.directExecutor()); - } - - private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) { - switch (entityType) { - case DEVICE: - return EdgeQueueEntityType.DEVICE; - case ASSET: - return EdgeQueueEntityType.ASSET; - case ENTITY_VIEW: - return EdgeQueueEntityType.ENTITY_VIEW; - default: - log.info("Unsupported entity type: [{}]", entityType); - return null; - } - } - - private ListenableFuture getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) { - List originatorEdgeRelations = relationService.findByToAndType(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); - if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) { - return Futures.immediateFuture(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); - } else { - return Futures.immediateFuture(null); - } - } - - private void pushEventToEdge(TenantId tenantId, EntityId originatorId, EdgeQueueEntityType edgeQueueEntityType, TbMsg tbMsg, FutureCallback callback) { - ListenableFuture edgeIdFuture = getEdgeIdByOriginatorId(tenantId, originatorId); - Futures.transform(edgeIdFuture, edgeId -> { - if (edgeId != null) { - try { - pushEventToEdge(tenantId, edgeId, edgeQueueEntityType, tbMsg, callback); - } catch (Exception e) { - log.error("Failed to push event to edge, edgeId [{}], tbMsg [{}]", edgeId, tbMsg, e); - } - } - return null; - }, - MoreExecutors.directExecutor()); - } - - private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback); - break; - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - Device device = mapper.readValue(tbMsg.getData(), Device.class); - pushEventToEdge(tenantId, device.getId(), EdgeQueueEntityType.DEVICE, tbMsg, callback); - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - // TODO: voba - handle properly edge creation - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback); - break; - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - Asset asset = mapper.readValue(tbMsg.getData(), Asset.class); - pushEventToEdge(tenantId, asset.getId(), EdgeQueueEntityType.ASSET, tbMsg, callback); - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback); - break; - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class); - pushEventToEdge(tenantId, entityView.getId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback); - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - case DataConstants.ALARM_ACK: - case DataConstants.ALARM_CLEAR: - Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class); - EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); - if (edgeQueueEntityType != null) { - pushEventToEdge(tenantId, alarm.getOriginator(), EdgeQueueEntityType.ALARM, tbMsg, callback); - } - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback); - } - - private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback callback) throws IOException { - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback); - break; - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); - if (RuleChainType.EDGE.equals(ruleChain.getType())) { - ListenableFuture> future = findEdgesByTenantIdAndRuleChainId(tenantId, ruleChain.getId(), new TimePageLink(Integer.MAX_VALUE)); - Futures.transform(future, edges -> { - if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) { - try { - for (Edge edge : edges.getData()) { - pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback); - } - } catch (IOException e) { - log.error("Can't push event to edge", e); - } - } - return null; - }, MoreExecutors.directExecutor()); - } - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback callback) throws IOException { - EdgeId edgeId; - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId"))); - pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); - break; - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId"))); - pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); - break; - case DataConstants.ENTITY_DELETED: - case DataConstants.ENTITY_CREATED: - case DataConstants.ENTITY_UPDATED: - Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class); - ListenableFuture> future = findEdgesByTenantIdAndDashboardId(tenantId, dashboard.getId(), new TimePageLink(Integer.MAX_VALUE)); - Futures.transform(future, edges -> { - if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) { - try { - for (Edge edge : edges.getData()) { - pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.DASHBOARD, tbMsg, callback); - } - } catch (IOException e) { - log.error("Can't push event to edge", e); - } - } - return null; - }, MoreExecutors.directExecutor()); - break; - } - } - - private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback callback) throws IOException { - log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg); - - saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback); - - if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) { - pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback); - } - } - - private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback callback) throws IOException { - log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data); - - EdgeQueueEntry queueEntry = new EdgeQueueEntry(); - queueEntry.setEntityType(entityType); - queueEntry.setType(type); - queueEntry.setData(data); - - Event event = new Event(); - event.setEntityId(edgeId); - event.setTenantId(tenantId); - event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE); - event.setBody(mapper.valueToTree(queueEntry)); - ListenableFuture saveFuture = eventService.saveAsync(event); - - addMainCallback(saveFuture, callback); - } - - private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Event result) { - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }, tsCallBackExecutor); - } - - private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback callback) throws IOException { - RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); - switch (tbMsg.getType()) { - case DataConstants.ENTITY_ASSIGNED_TO_EDGE: - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: - case DataConstants.ENTITY_UPDATED: - RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId()); - saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback); - break; - default: - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); - } - } - - @Override - public TimePageData findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { - return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink); - } - - @Override - public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { - edge.setRootRuleChainId(ruleChainId); - Edge savedEdge = saveEdge(edge); - RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId); - saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback() { - @Override - public void onSuccess(@Nullable Void aVoid) { - log.debug("Event saved successfully!"); - } - - @Override - public void onFailure(Throwable t) { - log.debug("Failure during event save", t); - } - }); - return savedEdge; - } - @Override public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) { log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId); @@ -713,7 +336,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic }, MoreExecutors.directExecutor()); } - private DataValidator edgeValidator = new DataValidator() {