Refactoring of notification to edge. Added EdgeNotificationService and queue msg proto

This commit is contained in:
Volodymyr Babak 2020-06-14 22:24:25 +03:00
parent e8afd97605
commit 600c9ec565
9 changed files with 490 additions and 393 deletions

View File

@ -0,0 +1,433 @@
package org.thingsboard.server.service.edge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeQueueEntityType;
import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
@TbCoreComponent
@Slf4j
public class DefaultEdgeNotificationService implements EdgeNotificationService {
private static final ObjectMapper mapper = new ObjectMapper();
@Autowired
private EdgeService edgeService;
@Autowired
private DeviceService deviceService;
@Autowired
private AssetService assetService;
@Autowired
private EntityViewService entityViewService;
@Autowired
private RuleChainService ruleChainService;
@Autowired
private RelationService relationService;
@Autowired
private EventService eventService;
private ExecutorService tsCallBackExecutor;
@PostConstruct
public void initExecutor() {
tsCallBackExecutor = Executors.newSingleThreadExecutor();
}
@PreDestroy
public void shutdownExecutor() {
if (tsCallBackExecutor != null) {
tsCallBackExecutor.shutdownNow();
}
}
@Override
public TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) {
return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink);
}
@Override
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
edge.setRootRuleChainId(ruleChainId);
Edge savedEdge = edgeService.saveEdge(edge);
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void aVoid) {
log.debug("Event saved successfully!");
}
@Override
public void onFailure(Throwable t) {
log.debug("Failure during event save", t);
}
});
return savedEdge;
}
private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback<Void> callback) throws IOException {
log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data);
EdgeQueueEntry queueEntry = new EdgeQueueEntry();
queueEntry.setEntityType(entityType);
queueEntry.setType(type);
queueEntry.setData(data);
Event event = new Event();
event.setEntityId(edgeId);
event.setTenantId(tenantId);
event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE);
event.setBody(mapper.valueToTree(queueEntry));
ListenableFuture<Event> saveFuture = eventService.saveAsync(event);
addMainCallback(saveFuture, callback);
}
private void addMainCallback(ListenableFuture<Event> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<Event>() {
@Override
public void onSuccess(@Nullable Event result) {
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
@Override
public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) ||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
processCustomTbMsg(tenantId, tbMsg, callback);
} else {
try {
switch (tbMsg.getOriginator().getEntityType()) {
case EDGE:
processEdge(tenantId, tbMsg, callback);
break;
case ASSET:
processAsset(tenantId, tbMsg, callback);
break;
case DEVICE:
processDevice(tenantId, tbMsg, callback);
break;
case DASHBOARD:
processDashboard(tenantId, tbMsg, callback);
break;
case RULE_CHAIN:
processRuleChain(tenantId, tbMsg, callback);
break;
case ENTITY_VIEW:
processEntityView(tenantId, tbMsg, callback);
break;
case ALARM:
processAlarm(tenantId, tbMsg, callback);
break;
default:
log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
}
} catch (IOException e) {
log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
}
}
}
private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator());
Futures.transform(edgeIdFuture, edgeId -> {
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType());
if (edgeId != null && edgeQueueEntityType != null) {
try {
saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback);
} catch (IOException e) {
log.error("Error while saving custom tbMsg into Edge Queue", e);
}
}
return null;
}, MoreExecutors.directExecutor());
}
private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Device device = mapper.readValue(tbMsg.getData(), Device.class);
pushEventToEdge(tenantId, device.getId(), EdgeQueueEntityType.DEVICE, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
// TODO: voba - handle properly edge creation
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Asset asset = mapper.readValue(tbMsg.getData(), Asset.class);
pushEventToEdge(tenantId, asset.getId(), EdgeQueueEntityType.ASSET, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class);
pushEventToEdge(tenantId, entityView.getId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
case DataConstants.ALARM_ACK:
case DataConstants.ALARM_CLEAR:
Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class);
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
if (edgeQueueEntityType != null) {
pushEventToEdge(tenantId, alarm.getOriginator(), EdgeQueueEntityType.ALARM, tbMsg, callback);
}
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class);
ListenableFuture<TimePageData<Edge>> future = edgeService.findEdgesByTenantIdAndDashboardId(tenantId, dashboard.getId(), new TimePageLink(Integer.MAX_VALUE));
Futures.transform(future, edges -> {
if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
try {
for (Edge edge : edges.getData()) {
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.DASHBOARD, tbMsg, callback);
}
} catch (IOException e) {
log.error("Can't push event to edge", e);
}
}
return null;
}, MoreExecutors.directExecutor());
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
if (RuleChainType.EDGE.equals(ruleChain.getType())) {
ListenableFuture<TimePageData<Edge>> future = edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, ruleChain.getId(), new TimePageLink(Integer.MAX_VALUE));
Futures.transform(future, edges -> {
if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
try {
for (Edge edge : edges.getData()) {
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback);
}
} catch (IOException e) {
log.error("Can't push event to edge", e);
}
}
return null;
}, MoreExecutors.directExecutor());
}
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback<Void> callback) throws IOException {
EdgeId edgeId;
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
break;
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
break;
}
}
private void pushEventToEdge(TenantId tenantId, EntityId originatorId, EdgeQueueEntityType edgeQueueEntityType, TbMsg tbMsg, FutureCallback<Void> callback) {
ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, originatorId);
Futures.transform(edgeIdFuture, edgeId -> {
if (edgeId != null) {
try {
pushEventToEdge(tenantId, edgeId, edgeQueueEntityType, tbMsg, callback);
} catch (Exception e) {
log.error("Failed to push event to edge, edgeId [{}], tbMsg [{}]", edgeId, tbMsg, e);
}
}
return null;
},
MoreExecutors.directExecutor());
}
private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) {
List<EntityRelation> originatorEdgeRelations = relationService.findByToAndType(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
return Futures.immediateFuture(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
} else {
return Futures.immediateFuture(null);
}
}
private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg);
saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback);
if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) {
pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback);
}
}
private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
case DataConstants.ENTITY_UPDATED:
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId());
saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) {
switch (entityType) {
case DEVICE:
return EdgeQueueEntityType.DEVICE;
case ASSET:
return EdgeQueueEntityType.ASSET;
case ENTITY_VIEW:
return EdgeQueueEntityType.ENTITY_VIEW;
default:
log.info("Unsupported entity type: [{}]", entityType);
return null;
}
}
}

View File

@ -53,6 +53,10 @@ public class EdgeContextComponent {
@Autowired @Autowired
private EdgeService edgeService; private EdgeService edgeService;
@Lazy
@Autowired
private EdgeNotificationService edgeNotificationService;
@Lazy @Lazy
@Autowired @Autowired
private AssetService assetService; private AssetService assetService;

View File

@ -0,0 +1,22 @@
package org.thingsboard.server.service.edge;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.io.IOException;
public interface EdgeNotificationService {
TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink);
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException;
void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback);
}

View File

@ -172,7 +172,7 @@ public final class EdgeGrpcSession implements Closeable {
TimePageData<Event> pageData; TimePageData<Event> pageData;
UUID ifOffset = null; UUID ifOffset = null;
do { do {
pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink); pageData = ctx.getEdgeNotificationService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
if (isConnected() && !pageData.getData().isEmpty()) { if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
for (Event event : pageData.getData()) { for (Event event : pageData.getData()) {

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
@ -42,6 +43,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeNotificationService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
@ -82,18 +84,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final TbLocalSubscriptionService localSubscriptionService; private final TbLocalSubscriptionService localSubscriptionService;
private final SubscriptionManagerService subscriptionManagerService; private final SubscriptionManagerService subscriptionManagerService;
private final TbCoreDeviceRpcService tbCoreDeviceRpcService; private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
private final EdgeNotificationService edgeNotificationService;
private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); private final TbCoreConsumerStats stats = new TbCoreConsumerStats();
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext, public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService, SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
TbCoreDeviceRpcService tbCoreDeviceRpcService) { TbCoreDeviceRpcService tbCoreDeviceRpcService, EdgeNotificationService edgeNotificationService) {
super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.stateService = stateService; this.stateService = stateService;
this.localSubscriptionService = localSubscriptionService; this.localSubscriptionService = localSubscriptionService;
this.subscriptionManagerService = subscriptionManagerService; this.subscriptionManagerService = subscriptionManagerService;
this.tbCoreDeviceRpcService = tbCoreDeviceRpcService; this.tbCoreDeviceRpcService = tbCoreDeviceRpcService;
this.edgeNotificationService = edgeNotificationService;
} }
@PostConstruct @PostConstruct
@ -142,6 +146,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasDeviceStateServiceMsg()) { } else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg()); log.trace("[{}] Forwarding message to state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback); forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) { } else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) { if (actorMsg.isPresent()) {
@ -275,6 +282,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
stateService.onQueueMsg(deviceStateServiceMsg, callback); stateService.onQueueMsg(deviceStateServiceMsg, callback);
} }
private void forwardToEdgeNotificationService(EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(edgeNotificationMsg);
}
edgeNotificationService.pushNotificationToEdge(edgeNotificationMsg, callback);
}
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) { private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
if (statsEnabled) { if (statsEnabled) {
stats.log(toDeviceActorMsg); stats.log(toDeviceActorMsg);

View File

@ -33,6 +33,7 @@ public class TbCoreConsumerStats {
private final AtomicInteger claimDeviceCounter = new AtomicInteger(0); private final AtomicInteger claimDeviceCounter = new AtomicInteger(0);
private final AtomicInteger deviceStateCounter = new AtomicInteger(0); private final AtomicInteger deviceStateCounter = new AtomicInteger(0);
private final AtomicInteger edgeNotificationCounter = new AtomicInteger(0);
private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0); private final AtomicInteger subscriptionMsgCounter = new AtomicInteger(0);
private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0); private final AtomicInteger toCoreNotificationsCounter = new AtomicInteger(0);
@ -66,6 +67,11 @@ public class TbCoreConsumerStats {
deviceStateCounter.incrementAndGet(); deviceStateCounter.incrementAndGet();
} }
public void log(TransportProtos.EdgeNotificationMsgProto msg) {
totalCounter.incrementAndGet();
edgeNotificationCounter.incrementAndGet();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) { public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.incrementAndGet(); totalCounter.incrementAndGet();
subscriptionMsgCounter.incrementAndGet(); subscriptionMsgCounter.incrementAndGet();

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -15,10 +15,8 @@
*/ */
package org.thingsboard.server.dao.edge; package org.thingsboard.server.dao.edge;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
@ -30,13 +28,9 @@ import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.page.TimePageData; import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbMsg;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
public interface EdgeService { public interface EdgeService {
@ -76,11 +70,6 @@ public interface EdgeService {
ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId); ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId);
void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback);
TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink);
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException;
void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId); void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId);

View File

@ -352,6 +352,12 @@ message FromDeviceRPCResponseProto {
string response = 3; string response = 3;
int32 error = 4; int32 error = 4;
} }
message EdgeNotificationMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
}
/** /**
* Main messages; * Main messages;
*/ */
@ -377,6 +383,7 @@ message ToCoreMsg {
DeviceStateServiceMsgProto deviceStateServiceMsg = 2; DeviceStateServiceMsgProto deviceStateServiceMsg = 2;
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3;
bytes toDeviceActorNotificationMsg = 4; bytes toDeviceActorNotificationMsg = 4;
EdgeNotificationMsgProto edgeNotificationMsg = 5;
} }
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */ /* High priority messages with low latency are handled by ThingsBoard Core Service separately */

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -15,14 +15,11 @@
*/ */
package org.thingsboard.server.dao.edge; package org.thingsboard.server.dao.edge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache; import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
@ -31,19 +28,10 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.Event;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeQueueEntityType;
import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
import org.thingsboard.server.common.data.edge.EdgeSearchQuery; import org.thingsboard.server.common.data.edge.EdgeSearchQuery;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.DashboardId;
@ -57,19 +45,10 @@ import org.thingsboard.server.common.data.page.TimePageData;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.customer.CustomerDao; import org.thingsboard.server.dao.customer.CustomerDao;
import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
@ -79,17 +58,11 @@ import org.thingsboard.server.dao.service.Validator;
import org.thingsboard.server.dao.tenant.TenantDao; import org.thingsboard.server.dao.tenant.TenantDao;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE;
@ -104,8 +77,6 @@ import static org.thingsboard.server.dao.service.Validator.validateString;
@Slf4j @Slf4j
public class EdgeServiceImpl extends AbstractEntityService implements EdgeService { public class EdgeServiceImpl extends AbstractEntityService implements EdgeService {
private static final ObjectMapper mapper = new ObjectMapper();
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; public static final String INCORRECT_PAGE_LINK = "Incorrect page link ";
public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId "; public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId ";
@ -123,41 +94,15 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
@Autowired @Autowired
private CacheManager cacheManager; private CacheManager cacheManager;
@Autowired
private EventService eventService;
@Autowired @Autowired
private DashboardService dashboardService; private DashboardService dashboardService;
@Autowired @Autowired
private RuleChainService ruleChainService; private RuleChainService ruleChainService;
@Autowired
private DeviceService deviceService;
@Autowired
private AssetService assetService;
@Autowired
private EntityViewService entityViewService;
@Autowired @Autowired
private RelationService relationService; private RelationService relationService;
private ExecutorService tsCallBackExecutor;
@PostConstruct
public void initExecutor() {
tsCallBackExecutor = Executors.newSingleThreadExecutor();
}
@PreDestroy
public void shutdownExecutor() {
if (tsCallBackExecutor != null) {
tsCallBackExecutor.shutdownNow();
}
}
@Override @Override
public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) { public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) {
log.trace("Executing findEdgeById [{}]", edgeId); log.trace("Executing findEdgeById [{}]", edgeId);
@ -259,7 +204,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
return edgeDao.findEdgesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(edgeIds)); return edgeDao.findEdgesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(edgeIds));
} }
@Override @Override
public void deleteEdgesByTenantId(TenantId tenantId) { public void deleteEdgesByTenantId(TenantId tenantId) {
log.trace("Executing deleteEdgesByTenantId, tenantId [{}]", tenantId); log.trace("Executing deleteEdgesByTenantId, tenantId [{}]", tenantId);
@ -344,327 +288,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
@Override
public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) ||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
processCustomTbMsg(tenantId, tbMsg, callback);
} else {
try {
switch (tbMsg.getOriginator().getEntityType()) {
case EDGE:
processEdge(tenantId, tbMsg, callback);
break;
case ASSET:
processAsset(tenantId, tbMsg, callback);
break;
case DEVICE:
processDevice(tenantId, tbMsg, callback);
break;
case DASHBOARD:
processDashboard(tenantId, tbMsg, callback);
break;
case RULE_CHAIN:
processRuleChain(tenantId, tbMsg, callback);
break;
case ENTITY_VIEW:
processEntityView(tenantId, tbMsg, callback);
break;
case ALARM:
processAlarm(tenantId, tbMsg, callback);
break;
default:
log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
}
} catch (IOException e) {
log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
}
}
}
private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator());
Futures.transform(edgeIdFuture, edgeId -> {
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType());
if (edgeId != null && edgeQueueEntityType != null) {
try {
saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback);
} catch (IOException e) {
log.error("Error while saving custom tbMsg into Edge Queue", e);
}
}
return null;
}, MoreExecutors.directExecutor());
}
private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) {
switch (entityType) {
case DEVICE:
return EdgeQueueEntityType.DEVICE;
case ASSET:
return EdgeQueueEntityType.ASSET;
case ENTITY_VIEW:
return EdgeQueueEntityType.ENTITY_VIEW;
default:
log.info("Unsupported entity type: [{}]", entityType);
return null;
}
}
private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) {
List<EntityRelation> originatorEdgeRelations = relationService.findByToAndType(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
return Futures.immediateFuture(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
} else {
return Futures.immediateFuture(null);
}
}
private void pushEventToEdge(TenantId tenantId, EntityId originatorId, EdgeQueueEntityType edgeQueueEntityType, TbMsg tbMsg, FutureCallback<Void> callback) {
ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, originatorId);
Futures.transform(edgeIdFuture, edgeId -> {
if (edgeId != null) {
try {
pushEventToEdge(tenantId, edgeId, edgeQueueEntityType, tbMsg, callback);
} catch (Exception e) {
log.error("Failed to push event to edge, edgeId [{}], tbMsg [{}]", edgeId, tbMsg, e);
}
}
return null;
},
MoreExecutors.directExecutor());
}
private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Device device = mapper.readValue(tbMsg.getData(), Device.class);
pushEventToEdge(tenantId, device.getId(), EdgeQueueEntityType.DEVICE, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
// TODO: voba - handle properly edge creation
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Asset asset = mapper.readValue(tbMsg.getData(), Asset.class);
pushEventToEdge(tenantId, asset.getId(), EdgeQueueEntityType.ASSET, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class);
pushEventToEdge(tenantId, entityView.getId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
case DataConstants.ALARM_ACK:
case DataConstants.ALARM_CLEAR:
Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class);
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
if (edgeQueueEntityType != null) {
pushEventToEdge(tenantId, alarm.getOriginator(), EdgeQueueEntityType.ALARM, tbMsg, callback);
}
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback);
}
private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
if (RuleChainType.EDGE.equals(ruleChain.getType())) {
ListenableFuture<TimePageData<Edge>> future = findEdgesByTenantIdAndRuleChainId(tenantId, ruleChain.getId(), new TimePageLink(Integer.MAX_VALUE));
Futures.transform(future, edges -> {
if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
try {
for (Edge edge : edges.getData()) {
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback);
}
} catch (IOException e) {
log.error("Can't push event to edge", e);
}
}
return null;
}, MoreExecutors.directExecutor());
}
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback<Void> callback) throws IOException {
EdgeId edgeId;
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
break;
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
break;
case DataConstants.ENTITY_DELETED:
case DataConstants.ENTITY_CREATED:
case DataConstants.ENTITY_UPDATED:
Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class);
ListenableFuture<TimePageData<Edge>> future = findEdgesByTenantIdAndDashboardId(tenantId, dashboard.getId(), new TimePageLink(Integer.MAX_VALUE));
Futures.transform(future, edges -> {
if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
try {
for (Edge edge : edges.getData()) {
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.DASHBOARD, tbMsg, callback);
}
} catch (IOException e) {
log.error("Can't push event to edge", e);
}
}
return null;
}, MoreExecutors.directExecutor());
break;
}
}
private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg);
saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback);
if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) {
pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback);
}
}
private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback<Void> callback) throws IOException {
log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data);
EdgeQueueEntry queueEntry = new EdgeQueueEntry();
queueEntry.setEntityType(entityType);
queueEntry.setType(type);
queueEntry.setData(data);
Event event = new Event();
event.setEntityId(edgeId);
event.setTenantId(tenantId);
event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE);
event.setBody(mapper.valueToTree(queueEntry));
ListenableFuture<Event> saveFuture = eventService.saveAsync(event);
addMainCallback(saveFuture, callback);
}
private void addMainCallback(ListenableFuture<Event> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<Event>() {
@Override
public void onSuccess(@Nullable Event result) {
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
switch (tbMsg.getType()) {
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
case DataConstants.ENTITY_UPDATED:
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId());
saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback);
break;
default:
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
}
}
@Override
public TimePageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) {
return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink);
}
@Override
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
edge.setRootRuleChainId(ruleChainId);
Edge savedEdge = saveEdge(edge);
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void aVoid) {
log.debug("Event saved successfully!");
}
@Override
public void onFailure(Throwable t) {
log.debug("Failure during event save", t);
}
});
return savedEdge;
}
@Override @Override
public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) { public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) {
log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId); log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId);
@ -713,7 +336,6 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
}, MoreExecutors.directExecutor()); }, MoreExecutors.directExecutor());
} }
private DataValidator<Edge> edgeValidator = private DataValidator<Edge> edgeValidator =
new DataValidator<Edge>() { new DataValidator<Edge>() {