From 926f4842300c92f289b65f46a90aaacf9c51bd1f Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 22 Jun 2023 18:19:37 +0300 Subject: [PATCH] Device events --- .../queue/DefaultTbCoreConsumerService.java | 73 ++++++++++++++++--- common/cluster-api/src/main/proto/queue.proto | 23 ++++++ .../transport/snmp/SnmpCommunicationSpec.java | 16 +++- .../transport/snmp/SnmpTransportContext.java | 43 +++++++---- .../snmp/service/SnmpTransportService.java | 22 +++++- .../snmp/session/DeviceSessionContext.java | 23 +++++- .../server/transport/snmp/SnmpTestV2.java | 9 ++- .../common/transport/TransportService.java | 7 ++ .../service/DefaultTransportService.java | 50 +++++++++++-- 9 files changed, 216 insertions(+), 50 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index dc24d6df33..db0b7c3809 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -28,14 +28,18 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.alarm.AlarmInfo; +import org.thingsboard.server.common.data.event.ErrorEvent; +import org.thingsboard.server.common.data.event.Event; +import org.thingsboard.server.common.data.event.LifecycleEvent; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.NotificationRequestId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; @@ -44,7 +48,9 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.ErrorEventProto; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; +import org.thingsboard.server.gen.transport.TransportProtos.LifecycleEventProto; import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; @@ -63,7 +69,6 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; -import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.DataDecodingEncodingService; @@ -274,6 +279,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService() { @Override public void onSuccess(ValidateDeviceCredentialsResponse msg) { if (msg.hasDeviceInfo()) { - registerTransportSession(deviceSessionContext, msg); - deviceSessionContext.setSessionTimeoutHandler(() -> { - registerTransportSession(deviceSessionContext, msg); + registerTransportSession(sessionContext, msg); + sessionContext.setSessionTimeoutHandler(() -> { + registerTransportSession(sessionContext, msg); }); + transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STARTED, true, null); } else { - log.warn("[{}] Failed to process device auth", deviceSessionContext.getDeviceId()); + log.warn("[{}] Failed to process device auth", sessionContext.getDeviceId()); } } @Override public void onError(Throwable e) { - log.warn("[{}] Failed to process device auth: {}", deviceSessionContext.getDeviceId(), e); + log.warn("[{}] Failed to process device auth: {}", sessionContext.getDeviceId(), e); + transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STARTED, false, e); } }); } diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 9177839c23..d14cafb2c2 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -144,6 +144,7 @@ public class SnmpTransportService implements TbTransportService { } } catch (Exception e) { log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString()); + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), config.getSpec().getLabel(), e); } }, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS); }) @@ -165,6 +166,7 @@ public class SnmpTransportService implements TbTransportService { List request = pduService.createPdus(sessionContext, communicationConfig, values); RequestContext requestContext = RequestContext.builder() .communicationSpec(communicationConfig.getSpec()) + .method(communicationConfig.getMethod()) .responseMappings(communicationConfig.getAllMappings()) .requestSize(request.size()) .build(); @@ -178,6 +180,7 @@ public class SnmpTransportService implements TbTransportService { snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext); } catch (IOException e) { log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString()); + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e); } } } @@ -223,6 +226,7 @@ public class SnmpTransportService implements TbTransportService { RequestContext requestContext = RequestContext.builder() .requestId(toDeviceRpcRequestMsg.getRequestId()) .communicationSpec(communicationConfig.getSpec()) + .method(snmpMethod) .responseMappings(communicationConfig.getAllMappings()) .requestSize(1) .build(); @@ -232,8 +236,10 @@ public class SnmpTransportService implements TbTransportService { public void processResponseEvent(DeviceSessionContext sessionContext, ResponseEvent event) { ((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext); + RequestContext requestContext = (RequestContext) event.getUserObject(); if (event.getError() != null) { log.warn("SNMP response error: {}", event.getError().toString()); + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException(event.getError())); return; } @@ -241,12 +247,14 @@ public class SnmpTransportService implements TbTransportService { if (log.isTraceEnabled()) { log.trace("Received PDU for device {}: {}", sessionContext.getDeviceId(), responsePdu); } - RequestContext requestContext = (RequestContext) event.getUserObject(); List response; if (requestContext.getRequestSize() == 1) { if (responsePdu == null) { log.debug("No response from SNMP device {}, requestId: {}", sessionContext.getDeviceId(), event.getRequest().getRequestID()); + if (requestContext.getMethod() == SnmpMethod.GET) { + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException("No response from device")); + } return; } response = List.of(responsePdu); @@ -268,7 +276,11 @@ public class SnmpTransportService implements TbTransportService { } responseProcessingExecutor.execute(() -> { - processResponse(sessionContext, response, requestContext); + try { + processResponse(sessionContext, response, requestContext); + } catch (Exception e) { + transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e); + } }); } @@ -279,7 +291,7 @@ public class SnmpTransportService implements TbTransportService { JsonObject responseData = responseDataMappers.get(requestContext.getCommunicationSpec()).map(response, requestContext); if (responseData.size() == 0) { log.warn("No values in the SNMP response for device {}", sessionContext.getDeviceId()); - return; + throw new IllegalArgumentException("No values in the response"); } responseProcessor.process(responseData, requestContext, sessionContext); @@ -365,15 +377,17 @@ public class SnmpTransportService implements TbTransportService { private static class RequestContext { private final Integer requestId; private final SnmpCommunicationSpec communicationSpec; + private final SnmpMethod method; private final List responseMappings; private final int requestSize; private List responseParts; @Builder - public RequestContext(Integer requestId, SnmpCommunicationSpec communicationSpec, List responseMappings, int requestSize) { + public RequestContext(Integer requestId, SnmpCommunicationSpec communicationSpec, SnmpMethod method, List responseMappings, int requestSize) { this.requestId = requestId; this.communicationSpec = communicationSpec; + this.method = method; this.responseMappings = responseMappings; this.requestSize = requestSize; if (requestSize > 1) { diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java index fede19824f..b95a11e8c2 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/session/DeviceSessionContext.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.transport.snmp.session; +import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -26,7 +27,9 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rpc.RpcStatus; +import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.service.DefaultTransportService; @@ -58,6 +61,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S private SnmpDeviceTransportConfiguration deviceTransportConfiguration; @Getter private final Device device; + @Getter + private final TenantId tenantId; private final SnmpTransportContext snmpTransportContext; @@ -70,7 +75,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Getter private final List> queryingTasks = new LinkedList<>(); - public DeviceSessionContext(Device device, DeviceProfile deviceProfile, String token, + @Builder + public DeviceSessionContext(TenantId tenantId, Device device, DeviceProfile deviceProfile, String token, SnmpDeviceProfileTransportConfiguration profileTransportConfiguration, SnmpDeviceTransportConfiguration deviceTransportConfiguration, SnmpTransportContext snmpTransportContext) throws Exception { @@ -78,6 +84,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S super.setDeviceId(device.getId()); super.setDeviceProfile(deviceProfile); this.device = device; + this.tenantId = tenantId; this.token = token; this.snmpTransportContext = snmpTransportContext; @@ -134,7 +141,11 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Override public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) { log.trace("[{}] Received attributes update notification to device", sessionId); - snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification); + try { + snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification); + } catch (Exception e) { + snmpTransportContext.getTransportService().errorEvent(getTenantId(), getDeviceId(), SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING.getLabel(), e); + } } @Override @@ -150,8 +161,12 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S @Override public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) { log.trace("[{}] Received RPC command to device", sessionId); - snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); - snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + try { + snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest); + snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY); + } catch (Exception e) { + snmpTransportContext.getTransportService().errorEvent(getTenantId(), getDeviceId(), SnmpCommunicationSpec.TO_DEVICE_RPC_REQUEST.getLabel(), e); + } } @Override diff --git a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java index 55ed8fb97f..fb7c374d01 100644 --- a/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java +++ b/common/transport/snmp/src/test/java/org/thingsboard/server/transport/snmp/SnmpTestV2.java @@ -26,10 +26,11 @@ public class SnmpTestV2 { device.start(); Map mappings = new HashMap<>(); - for (int i = 1; i <= 500; i++) { - String oid = String.format(".1.3.6.1.2.1.%s.1.52", i); - mappings.put(oid, "value_" + i); - } +// for (int i = 1; i <= 500; i++) { +// String oid = String.format(".1.3.6.1.2.1.%s.1.52", i); +// mappings.put(oid, "value_" + i); +// } + mappings.put("1.3.6.1.2.1.266.1.52", "****"); device.setUpMappings(mappings); new Scanner(System.in).nextLine(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index cd61d42016..9d52997c46 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -17,6 +17,9 @@ package org.thingsboard.server.common.transport; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; @@ -139,6 +142,10 @@ public interface TransportService { void reportActivity(SessionInfoProto sessionInfo); + void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error); + + void errorEvent(TenantId tenantId, DeviceId deviceId, String method, Throwable error); + void deregisterSession(SessionInfoProto sessionInfo); void log(SessionInfoProto sessionInfo, String msg); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 3b827edbae..d33b3cb42f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -22,6 +22,7 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; @@ -49,11 +50,12 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; -import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -817,6 +819,37 @@ public class DefaultTransportService implements TransportService { sessionsToRemove.forEach(sessionsActivity::remove); } + @Override + public void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error) { + ToCoreMsg msg = ToCoreMsg.newBuilder() + .setLifecycleEventMsg(TransportProtos.LifecycleEventProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setEntityIdMSB(deviceId.getId().getMostSignificantBits()) + .setEntityIdLSB(deviceId.getId().getLeastSignificantBits()) + .setServiceId(serviceInfoProvider.getServiceId()) + .setLcEventType(eventType.name()) + .setSuccess(success) + .setError(error != null ? ExceptionUtils.getStackTrace(error) : "")) + .build(); + sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + } + + @Override + public void errorEvent(TenantId tenantId, DeviceId deviceId, String method, Throwable error) { + ToCoreMsg msg = ToCoreMsg.newBuilder() + .setErrorEventMsg(TransportProtos.ErrorEventProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setEntityIdMSB(deviceId.getId().getMostSignificantBits()) + .setEntityIdLSB(deviceId.getId().getLeastSignificantBits()) + .setServiceId(serviceInfoProvider.getServiceId()) + .setMethod(method) + .setError(ExceptionUtils.getStackTrace(error))) + .build(); + sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY); + } + @Override public SessionMetaData registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener); @@ -1108,18 +1141,21 @@ public class DefaultTransportService implements TransportService { } protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo)); + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build(); + sendToCore(getTenantId(sessionInfo), getDeviceId(sessionInfo), toCoreMsg, getRoutingKey(sessionInfo), callback); + } + + private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback callback) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (log.isTraceEnabled()) { - log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg); + log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg); } + TransportTbQueueCallback transportTbQueueCallback = callback != null ? new TransportTbQueueCallback(callback) : null; tbCoreProducerStats.incrementTotal(); StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); - tbCoreMsgProducer.send(tpi, - new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), - ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), - wrappedCallback); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback); } private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {