Added ALARM entity support
This commit is contained in:
parent
c99cf51ca6
commit
acfaf68042
@ -350,6 +350,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
|
||||
case DataConstants.ALARM_ACK:
|
||||
case DataConstants.ALARM_CLEAR:
|
||||
edgeService.pushEventToEdge(tenantId, msg, new FutureCallback<Void>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void aVoid) {
|
||||
|
||||
@ -25,20 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.actors.service.ActorService;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.TimePageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.edge.EdgeService;
|
||||
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
|
||||
import org.thingsboard.server.gen.edge.RequestMsg;
|
||||
import org.thingsboard.server.gen.edge.ResponseMsg;
|
||||
@ -52,7 +41,6 @@ import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@ -74,24 +62,6 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
|
||||
@Autowired
|
||||
private EdgeContextComponent ctx;
|
||||
|
||||
@Autowired
|
||||
private EdgeService edgeService;
|
||||
|
||||
@Autowired
|
||||
private AssetService assetService;
|
||||
|
||||
@Autowired
|
||||
private DeviceService deviceService;
|
||||
|
||||
@Autowired
|
||||
private EntityViewService entityViewService;
|
||||
|
||||
@Autowired
|
||||
private AttributesService attributesService;
|
||||
|
||||
@Autowired
|
||||
private ActorService actorService;
|
||||
|
||||
private Server server;
|
||||
|
||||
private ExecutorService executor;
|
||||
@ -156,4 +126,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
|
||||
private void onEdgeDisconnect(EdgeId edgeId) {
|
||||
sessions.remove(edgeId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeQueueEntry;
|
||||
@ -59,6 +60,7 @@ import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
|
||||
import org.thingsboard.server.common.msg.session.SessionMsgType;
|
||||
import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
|
||||
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
|
||||
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.ConnectRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.ConnectResponseCode;
|
||||
@ -68,6 +70,7 @@ import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.edge.EntityDataProto;
|
||||
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.NodeConnectionInfoProto;
|
||||
import org.thingsboard.server.gen.edge.RequestMsg;
|
||||
@ -140,11 +143,6 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
.setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg()))
|
||||
.build());
|
||||
}
|
||||
if (requestMsg.getMsgType().equals(RequestMsgType.DEVICE_UPDATE_RPC_MESSAGE) && requestMsg.hasDeviceUpdateMsg()) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setUplinkResponseMsg(processUplinkMsg(requestMsg.getUplinkMsg()))
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,6 +159,7 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
void processHandleMessages() throws ExecutionException, InterruptedException {
|
||||
Long queueStartTs = getQueueStartTs().get();
|
||||
// TODO: this 100 value must be changed properly
|
||||
@ -170,7 +169,6 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
do {
|
||||
pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink);
|
||||
if (!pageData.getData().isEmpty()) {
|
||||
edge = ctx.getEdgeService().findEdgeById(edge.getTenantId(), edge.getId());
|
||||
for (Event event : pageData.getData()) {
|
||||
EdgeQueueEntry entry;
|
||||
try {
|
||||
@ -181,6 +179,8 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
case ALARM_ACK_RPC_MESSAGE:
|
||||
case ALARM_CLEARK_RPC_MESSAGE:
|
||||
processEntityCRUDMessage(entry, msgType);
|
||||
break;
|
||||
case RULE_CHAIN_CUSTOM_MESSAGE:
|
||||
@ -239,6 +239,10 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
private void processEntityCRUDMessage(EdgeQueueEntry entry, UpdateMsgType msgType) throws java.io.IOException {
|
||||
log.trace("Executing processEntityCRUDMessage, entry [{}], msgType [{}]", entry, msgType);
|
||||
switch (entry.getEntityType()) {
|
||||
case EDGE:
|
||||
Edge edge = objectMapper.readValue(entry.getData(), Edge.class);
|
||||
onEdgeUpdated(msgType, edge);
|
||||
break;
|
||||
case DEVICE:
|
||||
Device device = objectMapper.readValue(entry.getData(), Device.class);
|
||||
onDeviceUpdated(msgType, device);
|
||||
@ -263,6 +267,10 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class);
|
||||
onRuleChainMetadataUpdated(msgType, ruleChainMetaData);
|
||||
break;
|
||||
case ALARM:
|
||||
Alarm alarm = objectMapper.readValue(entry.getData(), Alarm.class);
|
||||
onAlarmUpdated(msgType, alarm);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,45 +292,107 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
} );
|
||||
}
|
||||
|
||||
private void onEdgeUpdated(UpdateMsgType msgType, Edge edge) {
|
||||
// TODO: voba add configuration update to edge
|
||||
this.edge = edge;
|
||||
}
|
||||
|
||||
private void onDeviceUpdated(UpdateMsgType msgType, Device device) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setDeviceUpdateMsg(constructDeviceUpdatedMsg(msgType, device))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onAssetUpdated(UpdateMsgType msgType, Asset asset) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setAssetUpdateMsg(constructAssetUpdatedMsg(msgType, asset))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onEntityViewUpdated(UpdateMsgType msgType, EntityView entityView) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setEntityViewUpdateMsg(constructEntityViewUpdatedMsg(msgType, entityView))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setRuleChainUpdateMsg(constructRuleChainUpdatedMsg(msgType, ruleChain))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
|
||||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
|
||||
if (ruleChainMetadataUpdateMsg != null) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) {
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setDashboardUpdateMsg(constructDashboardUpdatedMsg(msgType, dashboard))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void onAlarmUpdated(UpdateMsgType msgType, Alarm alarm) {
|
||||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
|
||||
.setAlarmUpdateMsg(constructAlarmUpdatedMsg(msgType, alarm))
|
||||
.build();
|
||||
outputStream.onNext(ResponseMsg.newBuilder()
|
||||
.setEntityUpdateMsg(entityUpdateMsg)
|
||||
.build());
|
||||
}
|
||||
|
||||
private AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm) {
|
||||
String entityName = null;
|
||||
switch (alarm.getOriginator().getEntityType()) {
|
||||
case DEVICE:
|
||||
entityName = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(alarm.getOriginator().getId())).getName();
|
||||
break;
|
||||
case ASSET:
|
||||
entityName = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(alarm.getOriginator().getId())).getName();
|
||||
break;
|
||||
case ENTITY_VIEW:
|
||||
entityName = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(alarm.getOriginator().getId())).getName();
|
||||
break;
|
||||
}
|
||||
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
|
||||
.setMsgType(msgType)
|
||||
.setName(alarm.getName())
|
||||
.setType(alarm.getName())
|
||||
.setOriginatorName(entityName)
|
||||
.setOriginatorType(alarm.getOriginator().getEntityType().name())
|
||||
.setSeverity(alarm.getSeverity().name())
|
||||
.setStatus(alarm.getStatus().name())
|
||||
.setStartTs(alarm.getStartTs())
|
||||
.setEndTs(alarm.getEndTs())
|
||||
.setAckTs(alarm.getAckTs())
|
||||
.setClearTs(alarm.getClearTs())
|
||||
.setDetails(JacksonUtil.toString(alarm.getDetails()))
|
||||
.setPropagate(alarm.isPropagate());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private UpdateMsgType getResponseMsgType(String msgType) {
|
||||
if (msgType.equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
|
||||
msgType.equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
|
||||
@ -504,31 +574,31 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) {
|
||||
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
|
||||
TbMsg tbMsg = null;
|
||||
TbMsg tmp = TbMsg.fromBytes(entityData.getTbMsg().toByteArray());
|
||||
switch (tmp.getOriginator().getEntityType()) {
|
||||
TbMsg originalTbMsg = TbMsg.fromBytes(entityData.getTbMsg().toByteArray());
|
||||
switch (originalTbMsg.getOriginator().getEntityType()) {
|
||||
case DEVICE:
|
||||
String deviceName = entityData.getEntityName();
|
||||
String deviceType = entityData.getEntityType();
|
||||
Device device = getOrCreateDevice(deviceName, deviceType);
|
||||
if (device != null) {
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), device.getId(), tmp.getMetaData().copy(),
|
||||
tmp.getDataType(), tmp.getData(), null, null, 0L);
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), device.getId(), originalTbMsg.getMetaData().copy(),
|
||||
originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L);
|
||||
}
|
||||
break;
|
||||
case ASSET:
|
||||
String assetName = entityData.getEntityName();
|
||||
Asset asset = ctx.getAssetService().findAssetByTenantIdAndName(edge.getTenantId(), assetName);
|
||||
if (asset != null) {
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), asset.getId(), tmp.getMetaData().copy(),
|
||||
tmp.getDataType(), tmp.getData(), null, null, 0L);
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), asset.getId(), originalTbMsg.getMetaData().copy(),
|
||||
originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L);
|
||||
}
|
||||
break;
|
||||
case ENTITY_VIEW:
|
||||
String entityViewName = entityData.getEntityName();
|
||||
EntityView entityView = ctx.getEntityViewService().findEntityViewByTenantIdAndName(edge.getTenantId(), entityViewName);
|
||||
if (entityView != null) {
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), tmp.getType(), entityView.getId(), tmp.getMetaData().copy(),
|
||||
tmp.getDataType(), tmp.getData(), null, null, 0L);
|
||||
tbMsg = new TbMsg(UUIDs.timeBased(), originalTbMsg.getType(), entityView.getId(), originalTbMsg.getMetaData().copy(),
|
||||
originalTbMsg.getDataType(), originalTbMsg.getData(), null, null, 0L);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -537,6 +607,17 @@ public final class EdgeGrpcSession implements Cloneable {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getDeviceUpdateMsgList() != null && !uplinkMsg.getDeviceUpdateMsgList().isEmpty()) {
|
||||
for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
|
||||
String deviceName = deviceUpdateMsg.getName();
|
||||
String deviceType = deviceUpdateMsg.getType();
|
||||
switch (deviceUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
getOrCreateDevice(deviceName, deviceType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(e.getMessage()).build();
|
||||
}
|
||||
|
||||
@ -16,5 +16,5 @@
|
||||
package org.thingsboard.server.common.data.edge;
|
||||
|
||||
public enum EdgeQueueEntityType {
|
||||
DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA
|
||||
DASHBOARD, ASSET, DEVICE, ENTITY_VIEW, ALARM, RULE_CHAIN, RULE_CHAIN_METADATA, EDGE
|
||||
}
|
||||
|
||||
@ -24,21 +24,16 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.edge.exception.EdgeConnectionException;
|
||||
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.ConnectRequestMsg;
|
||||
import org.thingsboard.server.gen.edge.ConnectResponseCode;
|
||||
import org.thingsboard.server.gen.edge.ConnectResponseMsg;
|
||||
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
|
||||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RequestMsg;
|
||||
import org.thingsboard.server.gen.edge.RequestMsgType;
|
||||
import org.thingsboard.server.gen.edge.ResponseMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
|
||||
|
||||
@ -72,12 +67,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
String edgeSecret,
|
||||
Consumer<UplinkResponseMsg> onUplinkResponse,
|
||||
Consumer<EdgeConfiguration> onEdgeUpdate,
|
||||
Consumer<DeviceUpdateMsg> onDeviceUpdate,
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<EntityUpdateMsg> onEntityUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError) {
|
||||
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort).usePlaintext();
|
||||
@ -92,7 +82,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
channel = builder.build();
|
||||
EdgeRpcServiceGrpc.EdgeRpcServiceStub stub = EdgeRpcServiceGrpc.newStub(channel);
|
||||
log.info("[{}] Sending a connect request to the TB!", edgeKey);
|
||||
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onDeviceUpdate, onAssetUpdate, onEntityViewUpdate, onRuleChainUpdate, onRuleChainMetadataUpdate, onDashboardUpdate, onDownlink, onError));
|
||||
this.inputStream = stub.handleMsgs(initOutputStream(edgeKey, onUplinkResponse, onEdgeUpdate, onEntityUpdate, onDownlink, onError));
|
||||
this.inputStream.onNext(RequestMsg.newBuilder()
|
||||
.setMsgType(RequestMsgType.CONNECT_RPC_MESSAGE)
|
||||
.setConnectRequestMsg(ConnectRequestMsg.newBuilder().setEdgeRoutingKey(edgeKey).setEdgeSecret(edgeSecret).build())
|
||||
@ -118,12 +108,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
private StreamObserver<ResponseMsg> initOutputStream(String edgeKey,
|
||||
Consumer<UplinkResponseMsg> onUplinkResponse,
|
||||
Consumer<EdgeConfiguration> onEdgeUpdate,
|
||||
Consumer<DeviceUpdateMsg> onDeviceUpdate,
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<EntityUpdateMsg> onEntityUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError) {
|
||||
return new StreamObserver<ResponseMsg>() {
|
||||
@ -141,24 +126,9 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
} else if (responseMsg.hasUplinkResponseMsg()) {
|
||||
log.debug("[{}] Uplink response message received {}", edgeKey, responseMsg.getUplinkResponseMsg());
|
||||
onUplinkResponse.accept(responseMsg.getUplinkResponseMsg());
|
||||
} else if (responseMsg.hasDeviceUpdateMsg()) {
|
||||
log.debug("[{}] Device update message received {}", edgeKey, responseMsg.getDeviceUpdateMsg());
|
||||
onDeviceUpdate.accept(responseMsg.getDeviceUpdateMsg());
|
||||
} else if (responseMsg.hasAssetUpdateMsg()) {
|
||||
log.debug("[{}] Asset update message received {}", edgeKey, responseMsg.getAssetUpdateMsg());
|
||||
onAssetUpdate.accept(responseMsg.getAssetUpdateMsg());
|
||||
} else if (responseMsg.hasEntityViewUpdateMsg()) {
|
||||
log.debug("[{}] EntityView update message received {}", edgeKey, responseMsg.getEntityViewUpdateMsg());
|
||||
onEntityViewUpdate.accept(responseMsg.getEntityViewUpdateMsg());
|
||||
} else if (responseMsg.hasRuleChainUpdateMsg()) {
|
||||
log.debug("[{}] Rule Chain udpate message received {}", edgeKey, responseMsg.getRuleChainUpdateMsg());
|
||||
onRuleChainUpdate.accept(responseMsg.getRuleChainUpdateMsg());
|
||||
} else if (responseMsg.hasRuleChainMetadataUpdateMsg()) {
|
||||
log.debug("[{}] Rule Chain Metadata udpate message received {}", edgeKey, responseMsg.getRuleChainMetadataUpdateMsg());
|
||||
onRuleChainMetadataUpdate.accept(responseMsg.getRuleChainMetadataUpdateMsg());
|
||||
} else if (responseMsg.hasDashboardUpdateMsg()) {
|
||||
log.debug("[{}] Dashboard message received {}", edgeKey, responseMsg.getDashboardUpdateMsg());
|
||||
onDashboardUpdate.accept(responseMsg.getDashboardUpdateMsg());
|
||||
} else if (responseMsg.hasEntityUpdateMsg()) {
|
||||
log.debug("[{}] Entity update message received {}", edgeKey, responseMsg.getEntityUpdateMsg());
|
||||
onEntityUpdate.accept(responseMsg.getEntityUpdateMsg());
|
||||
} else if (responseMsg.hasDownlinkMsg()) {
|
||||
log.debug("[{}] Downlink message received for rule chain {}", edgeKey, responseMsg.getDownlinkMsg());
|
||||
onDownlink.accept(responseMsg.getDownlinkMsg());
|
||||
|
||||
@ -15,14 +15,9 @@
|
||||
*/
|
||||
package org.thingsboard.edge.rpc;
|
||||
|
||||
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.EntityUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkMsg;
|
||||
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
|
||||
|
||||
@ -34,12 +29,7 @@ public interface EdgeRpcClient {
|
||||
String integrationSecret,
|
||||
Consumer<UplinkResponseMsg> onUplinkResponse,
|
||||
Consumer<EdgeConfiguration> onEdgeUpdate,
|
||||
Consumer<DeviceUpdateMsg> onDeviceUpdate,
|
||||
Consumer<AssetUpdateMsg> onAssetUpdate,
|
||||
Consumer<EntityViewUpdateMsg> onEntityViewUpdate,
|
||||
Consumer<RuleChainUpdateMsg> onRuleChainUpdate,
|
||||
Consumer<RuleChainMetadataUpdateMsg> onRuleChainMetadataUpdate,
|
||||
Consumer<DashboardUpdateMsg> onDashboardUpdate,
|
||||
Consumer<EntityUpdateMsg> onEntityUpdate,
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError);
|
||||
|
||||
|
||||
@ -34,26 +34,29 @@ service EdgeRpcService {
|
||||
message RequestMsg {
|
||||
RequestMsgType msgType = 1;
|
||||
ConnectRequestMsg connectRequestMsg = 2;
|
||||
DeviceUpdateMsg deviceUpdateMsg = 3;
|
||||
UplinkMsg uplinkMsg = 4;
|
||||
UplinkMsg uplinkMsg = 3;
|
||||
}
|
||||
|
||||
message ResponseMsg {
|
||||
ConnectResponseMsg connectResponseMsg = 1;
|
||||
UplinkResponseMsg uplinkResponseMsg = 2;
|
||||
DeviceUpdateMsg deviceUpdateMsg = 3;
|
||||
RuleChainUpdateMsg ruleChainUpdateMsg = 4;
|
||||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 5;
|
||||
DashboardUpdateMsg dashboardUpdateMsg = 6;
|
||||
AssetUpdateMsg assetUpdateMsg = 7;
|
||||
EntityViewUpdateMsg entityViewUpdateMsg = 8;
|
||||
DownlinkMsg downlinkMsg = 9;
|
||||
EntityUpdateMsg entityUpdateMsg = 3;
|
||||
DownlinkMsg downlinkMsg = 4;
|
||||
}
|
||||
|
||||
message EntityUpdateMsg {
|
||||
DeviceUpdateMsg deviceUpdateMsg = 1;
|
||||
RuleChainUpdateMsg ruleChainUpdateMsg = 2;
|
||||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 3;
|
||||
DashboardUpdateMsg dashboardUpdateMsg = 4;
|
||||
AssetUpdateMsg assetUpdateMsg = 5;
|
||||
EntityViewUpdateMsg entityViewUpdateMsg = 6;
|
||||
AlarmUpdateMsg alarmUpdateMsg = 7;
|
||||
}
|
||||
|
||||
enum RequestMsgType {
|
||||
CONNECT_RPC_MESSAGE = 0;
|
||||
UPLINK_RPC_MESSAGE = 1;
|
||||
DEVICE_UPDATE_RPC_MESSAGE = 2;
|
||||
}
|
||||
|
||||
message ConnectRequestMsg {
|
||||
@ -85,7 +88,9 @@ enum UpdateMsgType {
|
||||
ENTITY_CREATED_RPC_MESSAGE = 0;
|
||||
ENTITY_UPDATED_RPC_MESSAGE = 1;
|
||||
ENTITY_DELETED_RPC_MESSAGE = 2;
|
||||
RULE_CHAIN_CUSTOM_MESSAGE = 3;
|
||||
ALARM_ACK_RPC_MESSAGE = 3;
|
||||
ALARM_CLEARK_RPC_MESSAGE = 4;
|
||||
RULE_CHAIN_CUSTOM_MESSAGE = 5;
|
||||
}
|
||||
|
||||
message EntityDataProto {
|
||||
@ -166,6 +171,22 @@ message EntityViewUpdateMsg {
|
||||
EntityType relatedEntityType = 6;
|
||||
}
|
||||
|
||||
message AlarmUpdateMsg {
|
||||
UpdateMsgType msgType = 1;
|
||||
string name = 2;
|
||||
string type = 3;
|
||||
string originatorType = 4;
|
||||
string originatorName = 5;
|
||||
string severity = 6;
|
||||
string status = 7;
|
||||
int64 startTs = 8;
|
||||
int64 endTs = 9;
|
||||
int64 ackTs = 10;
|
||||
int64 clearTs = 11;
|
||||
string details = 12;
|
||||
bool propagate = 13;
|
||||
}
|
||||
|
||||
enum EntityType {
|
||||
DEVICE = 0;
|
||||
ASSET = 1;
|
||||
@ -178,6 +199,8 @@ enum EntityType {
|
||||
message UplinkMsg {
|
||||
int32 uplinkMsgId = 1;
|
||||
repeated EntityDataProto entityData = 2;
|
||||
repeated DeviceUpdateMsg deviceUpdateMsg = 3;
|
||||
repeated AlarmUpdateMsg alarmUpdatemsg = 4;
|
||||
}
|
||||
|
||||
message UplinkResponseMsg {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.ShortEdgeInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeQueueEntityType;
|
||||
@ -151,7 +152,6 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) {
|
||||
log.trace("Executing findEdgeById [{}]", edgeId);
|
||||
@ -349,6 +349,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
} else {
|
||||
try {
|
||||
switch (tbMsg.getOriginator().getEntityType()) {
|
||||
case EDGE:
|
||||
processEdge(tenantId, tbMsg, callback);
|
||||
break;
|
||||
case ASSET:
|
||||
processAsset(tenantId, tbMsg, callback);
|
||||
break;
|
||||
@ -364,6 +367,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
case ENTITY_VIEW:
|
||||
processEntityView(tenantId, tbMsg, callback);
|
||||
break;
|
||||
case ALARM:
|
||||
processAlarm(tenantId, tbMsg, callback);
|
||||
break;
|
||||
default:
|
||||
log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
|
||||
}
|
||||
@ -374,26 +380,9 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
}
|
||||
|
||||
private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
|
||||
EdgeId edgeId = null;
|
||||
EdgeQueueEntityType edgeQueueEntityType = null;
|
||||
switch (tbMsg.getOriginator().getEntityType()) {
|
||||
case DEVICE:
|
||||
edgeQueueEntityType = EdgeQueueEntityType.DEVICE;
|
||||
Device device = deviceService.findDeviceById(tenantId, new DeviceId(tbMsg.getOriginator().getId()));
|
||||
edgeId = device.getEdgeId();
|
||||
break;
|
||||
case ASSET:
|
||||
edgeQueueEntityType = EdgeQueueEntityType.ASSET;
|
||||
Asset asset = assetService.findAssetById(tenantId, new AssetId(tbMsg.getOriginator().getId()));
|
||||
edgeId = asset.getEdgeId();
|
||||
break;
|
||||
case ENTITY_VIEW:
|
||||
edgeQueueEntityType = EdgeQueueEntityType.ENTITY_VIEW;
|
||||
EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(tbMsg.getOriginator().getId()));
|
||||
edgeId = entityView.getEdgeId();
|
||||
break;
|
||||
}
|
||||
if (edgeId != null) {
|
||||
EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator());
|
||||
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType());
|
||||
if (edgeId != null && edgeQueueEntityType != null) {
|
||||
try {
|
||||
saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback);
|
||||
} catch (IOException e) {
|
||||
@ -402,6 +391,37 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
}
|
||||
}
|
||||
|
||||
private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
return EdgeQueueEntityType.DEVICE;
|
||||
case ASSET:
|
||||
return EdgeQueueEntityType.ASSET;
|
||||
case ENTITY_VIEW:
|
||||
return EdgeQueueEntityType.ENTITY_VIEW;
|
||||
default:
|
||||
log.info("Unsupported entity type: [{}]", entityType);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private EdgeId getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) {
|
||||
switch (originatorId.getEntityType()) {
|
||||
case DEVICE:
|
||||
Device device = deviceService.findDeviceById(tenantId, new DeviceId(originatorId.getId()));
|
||||
return device.getEdgeId();
|
||||
case ASSET:
|
||||
Asset asset = assetService.findAssetById(tenantId, new AssetId(originatorId.getId()));
|
||||
return asset.getEdgeId();
|
||||
case ENTITY_VIEW:
|
||||
EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(originatorId.getId()));
|
||||
return entityView.getEdgeId();
|
||||
default:
|
||||
log.info("Unsupported entity type: [{}]", originatorId.getEntityType());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
@ -421,6 +441,21 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
}
|
||||
}
|
||||
|
||||
private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
Edge edge = mapper.readValue(tbMsg.getData(), Edge.class);
|
||||
if (edge != null) {
|
||||
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.EDGE, tbMsg, callback);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
|
||||
@ -459,6 +494,25 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
}
|
||||
}
|
||||
|
||||
private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
|
||||
switch (tbMsg.getType()) {
|
||||
case DataConstants.ENTITY_DELETED:
|
||||
case DataConstants.ENTITY_CREATED:
|
||||
case DataConstants.ENTITY_UPDATED:
|
||||
case DataConstants.ALARM_ACK:
|
||||
case DataConstants.ALARM_CLEAR:
|
||||
Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class);
|
||||
EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, alarm.getOriginator());
|
||||
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
|
||||
if (edgeId != null && edgeQueueEntityType != null) {
|
||||
pushEventToEdge(tenantId, edgeId, EdgeQueueEntityType.ALARM, tbMsg, callback);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
|
||||
}
|
||||
}
|
||||
|
||||
private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
|
||||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback);
|
||||
}
|
||||
@ -562,8 +616,8 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
@Override
|
||||
public Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
|
||||
edge.setRootRuleChainId(ruleChainId);
|
||||
Edge saveEdge = saveEdge(edge);
|
||||
ruleChainService.updateEdgeRuleChains(tenantId, saveEdge.getId());
|
||||
Edge savedEdge = saveEdge(edge);
|
||||
ruleChainService.updateEdgeRuleChains(tenantId, savedEdge.getId());
|
||||
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
|
||||
saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() {
|
||||
@Override
|
||||
@ -576,7 +630,7 @@ public class BaseEdgeService extends AbstractEntityService implements EdgeServic
|
||||
log.debug("Failure during event save", t);
|
||||
}
|
||||
});
|
||||
return saveEdge;
|
||||
return savedEdge;
|
||||
}
|
||||
|
||||
private DataValidator<Edge> edgeValidator =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user