Device events

This commit is contained in:
ViacheslavKlimov 2023-06-22 18:19:37 +03:00
parent 854e059435
commit 926f484230
9 changed files with 216 additions and 50 deletions

View File

@ -28,14 +28,18 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.Event;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
@ -44,7 +48,9 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ErrorEventProto;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.LifecycleEventProto;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
@ -63,7 +69,6 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
@ -274,6 +279,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
} else if (toCoreMsg.hasErrorEventMsg()) {
forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
} else if (toCoreMsg.hasLifecycleEventMsg()) {
forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -519,50 +528,52 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
subscriptionManagerService.cancelSubscription(closeProto.getSessionId(), closeProto.getSubscriptionId(), callback);
} else if (msg.hasTsUpdate()) {
TbTimeSeriesUpdateProto proto = msg.getTsUpdate();
long tenantIdMSB = proto.getTenantIdMSB();
long tenantIdLSB = proto.getTenantIdLSB();
subscriptionManagerService.onTimeSeriesUpdate(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(tenantIdMSB, tenantIdLSB),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
TbSubscriptionUtils.toTsKvEntityList(proto.getDataList()), callback);
} else if (msg.hasAttrUpdate()) {
TbAttributeUpdateProto proto = msg.getAttrUpdate();
subscriptionManagerService.onAttributesUpdate(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getScope(), TbSubscriptionUtils.toAttributeKvList(proto.getDataList()), callback);
} else if (msg.hasAttrDelete()) {
TbAttributeDeleteProto proto = msg.getAttrDelete();
subscriptionManagerService.onAttributesDelete(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getScope(), proto.getKeysList(), proto.getNotifyDevice(), callback);
} else if (msg.hasTsDelete()) {
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
subscriptionManagerService.onTimeSeriesDelete(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getKeysList(), callback);
} else if (msg.hasAlarmUpdate()) {
TbAlarmUpdateProto proto = msg.getAlarmUpdate();
subscriptionManagerService.onAlarmUpdate(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class),
callback);
} else if (msg.hasAlarmDelete()) {
TbAlarmDeleteProto proto = msg.getAlarmDelete();
subscriptionManagerService.onAlarmDeleted(
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
toTenantId(proto.getTenantIdMSB(), proto.getTenantIdLSB()),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
JacksonUtil.fromString(proto.getAlarm(), AlarmInfo.class), callback);
} else if (msg.hasNotificationUpdate()) {
TransportProtos.NotificationUpdateProto updateProto = msg.getNotificationUpdate();
TenantId tenantId = TenantId.fromUUID(new UUID(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB()));
TenantId tenantId = toTenantId(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB());
UserId recipientId = new UserId(new UUID(updateProto.getRecipientIdMSB(), updateProto.getRecipientIdLSB()));
NotificationUpdate update = JacksonUtil.fromString(updateProto.getUpdate(), NotificationUpdate.class);
subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, callback);
} else if (msg.hasNotificationRequestUpdate()) {
TransportProtos.NotificationRequestUpdateProto updateProto = msg.getNotificationRequestUpdate();
TenantId tenantId = TenantId.fromUUID(new UUID(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB()));
TenantId tenantId = toTenantId(updateProto.getTenantIdMSB(), updateProto.getTenantIdLSB());
NotificationRequestUpdate update = JacksonUtil.fromString(updateProto.getUpdate(), NotificationRequestUpdate.class);
subscriptionManagerService.onNotificationRequestUpdate(tenantId, update, callback);
} else {
@ -584,7 +595,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
if (statsEnabled) {
stats.log(deviceActivityMsg);
}
TenantId tenantId = TenantId.fromUUID(new UUID(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB()));
TenantId tenantId = toTenantId(deviceActivityMsg.getTenantIdMSB(), deviceActivityMsg.getTenantIdLSB());
DeviceId deviceId = new DeviceId(new UUID(deviceActivityMsg.getDeviceIdMSB(), deviceActivityMsg.getDeviceIdLSB()));
try {
stateService.onDeviceActivity(tenantId, deviceId, deviceActivityMsg.getLastActivityTime());
@ -595,7 +606,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
TenantId tenantId = TenantId.fromUUID(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));
try {
notificationSchedulerService.scheduleNotificationRequest(tenantId, notificationRequestId, msg.getTs());
@ -627,11 +638,49 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
callback.onSuccess();
}
private void forwardToEventService(ErrorEventProto eventProto, TbCallback callback) {
Event event = ErrorEvent.builder()
.tenantId(toTenantId(eventProto.getTenantIdMSB(), eventProto.getTenantIdLSB()))
.entityId(new UUID(eventProto.getEntityIdMSB(), eventProto.getEntityIdLSB()))
.serviceId(eventProto.getServiceId())
.ts(System.currentTimeMillis())
.method(eventProto.getMethod())
.error(eventProto.getError())
.build();
forwardToEventService(event, callback);
}
private void forwardToEventService(LifecycleEventProto eventProto, TbCallback callback) {
Event event = LifecycleEvent.builder()
.tenantId(toTenantId(eventProto.getTenantIdMSB(), eventProto.getTenantIdLSB()))
.entityId(new UUID(eventProto.getEntityIdMSB(), eventProto.getEntityIdLSB()))
.serviceId(eventProto.getServiceId())
.ts(System.currentTimeMillis())
.lcEventType(eventProto.getLcEventType())
.success(eventProto.getSuccess())
.error(StringUtils.isNotEmpty(eventProto.getError()) ? eventProto.getError() : null)
.build();
forwardToEventService(event, callback);
}
private void forwardToEventService(Event event, TbCallback callback) {
try {
actorContext.getEventService().saveAsync(event);
callback.onSuccess();
} catch (Exception e) {
callback.onFailure(e);
}
}
private void throwNotHandled(Object msg, TbCallback callback) {
log.warn("Message not handled: {}", msg);
callback.onFailure(new RuntimeException("Message not handled!"));
}
private TenantId toTenantId(long tenantIdMSB, long tenantIdLSB) {
return TenantId.fromUUID(new UUID(tenantIdMSB, tenantIdLSB));
}
@Override
protected void stopMainConsumers() {
if (mainConsumer != null) {

View File

@ -974,6 +974,8 @@ message ToCoreMsg {
EdgeNotificationMsgProto edgeNotificationMsg = 5;
DeviceActivityProto deviceActivityMsg = 6;
NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7;
LifecycleEventProto lifecycleEventMsg = 8;
ErrorEventProto errorEventMsg = 9;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
@ -1065,3 +1067,24 @@ message NotificationSchedulerServiceMsg {
message NotificationRuleProcessorMsg {
bytes trigger = 1;
}
message ErrorEventProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 entityIdMSB = 3;
int64 entityIdLSB = 4;
string serviceId = 5;
string method = 6;
string error = 7;
}
message LifecycleEventProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 entityIdMSB = 3;
int64 entityIdLSB = 4;
string serviceId = 5;
string lcEventType = 6;
bool success = 7;
string error = 8;
}

View File

@ -15,11 +15,19 @@
*/
package org.thingsboard.server.common.data.transport.snmp;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum SnmpCommunicationSpec {
TELEMETRY_QUERYING,
CLIENT_ATTRIBUTES_QUERYING,
SHARED_ATTRIBUTES_SETTING,
TELEMETRY_QUERYING("telemetryQuerying"),
CLIENT_ATTRIBUTES_QUERYING("clientAttributesQuerying"),
SHARED_ATTRIBUTES_SETTING("sharedAttributesSetting"),
TO_DEVICE_RPC_REQUEST("rpcRequest");
private final String label;
TO_DEVICE_RPC_REQUEST,
}

View File

@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfigu
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.transport.DeviceUpdatedEvent;
@ -113,19 +114,25 @@ public class SnmpTransportContext extends TransportContext {
SnmpDeviceProfileTransportConfiguration profileTransportConfiguration = (SnmpDeviceProfileTransportConfiguration) deviceProfile.getProfileData().getTransportConfiguration();
SnmpDeviceTransportConfiguration deviceTransportConfiguration = (SnmpDeviceTransportConfiguration) device.getDeviceData().getTransportConfiguration();
DeviceSessionContext deviceSessionContext;
DeviceSessionContext sessionContext;
try {
deviceSessionContext = new DeviceSessionContext(
device, deviceProfile, credentials.getCredentialsId(),
profileTransportConfiguration, deviceTransportConfiguration, this
);
registerSessionMsgListener(deviceSessionContext);
sessionContext = DeviceSessionContext.builder()
.tenantId(deviceProfile.getTenantId())
.device(device)
.deviceProfile(deviceProfile)
.token(credentials.getCredentialsId())
.profileTransportConfiguration(profileTransportConfiguration)
.deviceTransportConfiguration(deviceTransportConfiguration)
.snmpTransportContext(this)
.build();
registerSessionMsgListener(sessionContext);
} catch (Exception e) {
log.error("Failed to establish session for SNMP device {}: {}", device.getId(), e.toString());
transportService.errorEvent(device.getTenantId(), device.getId(), "sessionEstablishing", e);
return;
}
sessions.put(device.getId(), deviceSessionContext);
snmpTransportService.createQueryingTasks(deviceSessionContext);
sessions.put(device.getId(), sessionContext);
snmpTransportService.createQueryingTasks(sessionContext);
log.info("Established SNMP device session for device {}", device.getId());
}
@ -148,14 +155,17 @@ public class SnmpTransportContext extends TransportContext {
sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
snmpTransportService.cancelQueryingTasks(sessionContext);
snmpTransportService.createQueryingTasks(sessionContext);
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null);
} else if (!newDeviceTransportConfiguration.equals(sessionContext.getDeviceTransportConfiguration())) {
sessionContext.setDeviceTransportConfiguration(newDeviceTransportConfiguration);
sessionContext.initializeTarget(newProfileTransportConfiguration, newDeviceTransportConfiguration);
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, true, null);
} else {
log.trace("Configuration of the device {} was not updated", device);
}
} catch (Exception e) {
log.error("Failed to update session for SNMP device {}: {}", sessionContext.getDeviceId(), e.getMessage());
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.UPDATED, false, e);
destroyDeviceSession(sessionContext);
}
}
@ -168,28 +178,31 @@ public class SnmpTransportContext extends TransportContext {
transportService.deregisterSession(sessionContext.getSessionInfo());
snmpTransportService.cancelQueryingTasks(sessionContext);
sessions.remove(sessionContext.getDeviceId());
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STOPPED, true, null);
log.trace("Unregistered and removed session");
}
private void registerSessionMsgListener(DeviceSessionContext deviceSessionContext) {
private void registerSessionMsgListener(DeviceSessionContext sessionContext) {
transportService.process(DeviceTransportType.SNMP,
TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceSessionContext.getToken()).build(),
TransportProtos.ValidateDeviceTokenRequestMsg.newBuilder().setToken(sessionContext.getToken()).build(),
new TransportServiceCallback<>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponse msg) {
if (msg.hasDeviceInfo()) {
registerTransportSession(deviceSessionContext, msg);
deviceSessionContext.setSessionTimeoutHandler(() -> {
registerTransportSession(deviceSessionContext, msg);
registerTransportSession(sessionContext, msg);
sessionContext.setSessionTimeoutHandler(() -> {
registerTransportSession(sessionContext, msg);
});
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STARTED, true, null);
} else {
log.warn("[{}] Failed to process device auth", deviceSessionContext.getDeviceId());
log.warn("[{}] Failed to process device auth", sessionContext.getDeviceId());
}
}
@Override
public void onError(Throwable e) {
log.warn("[{}] Failed to process device auth: {}", deviceSessionContext.getDeviceId(), e);
log.warn("[{}] Failed to process device auth: {}", sessionContext.getDeviceId(), e);
transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STARTED, false, e);
}
});
}

View File

@ -144,6 +144,7 @@ public class SnmpTransportService implements TbTransportService {
}
} catch (Exception e) {
log.error("Failed to send SNMP request for device {}: {}", sessionContext.getDeviceId(), e.toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), config.getSpec().getLabel(), e);
}
}, queryingFrequency, queryingFrequency, TimeUnit.MILLISECONDS);
})
@ -165,6 +166,7 @@ public class SnmpTransportService implements TbTransportService {
List<PDU> request = pduService.createPdus(sessionContext, communicationConfig, values);
RequestContext requestContext = RequestContext.builder()
.communicationSpec(communicationConfig.getSpec())
.method(communicationConfig.getMethod())
.responseMappings(communicationConfig.getAllMappings())
.requestSize(request.size())
.build();
@ -178,6 +180,7 @@ public class SnmpTransportService implements TbTransportService {
snmp.send(pdu, sessionContext.getTarget(), requestContext, sessionContext);
} catch (IOException e) {
log.error("Failed to send SNMP request to device {}: {}", sessionContext.getDeviceId(), e.toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e);
}
}
}
@ -223,6 +226,7 @@ public class SnmpTransportService implements TbTransportService {
RequestContext requestContext = RequestContext.builder()
.requestId(toDeviceRpcRequestMsg.getRequestId())
.communicationSpec(communicationConfig.getSpec())
.method(snmpMethod)
.responseMappings(communicationConfig.getAllMappings())
.requestSize(1)
.build();
@ -232,8 +236,10 @@ public class SnmpTransportService implements TbTransportService {
public void processResponseEvent(DeviceSessionContext sessionContext, ResponseEvent event) {
((Snmp) event.getSource()).cancel(event.getRequest(), sessionContext);
RequestContext requestContext = (RequestContext) event.getUserObject();
if (event.getError() != null) {
log.warn("SNMP response error: {}", event.getError().toString());
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException(event.getError()));
return;
}
@ -241,12 +247,14 @@ public class SnmpTransportService implements TbTransportService {
if (log.isTraceEnabled()) {
log.trace("Received PDU for device {}: {}", sessionContext.getDeviceId(), responsePdu);
}
RequestContext requestContext = (RequestContext) event.getUserObject();
List<PDU> response;
if (requestContext.getRequestSize() == 1) {
if (responsePdu == null) {
log.debug("No response from SNMP device {}, requestId: {}", sessionContext.getDeviceId(), event.getRequest().getRequestID());
if (requestContext.getMethod() == SnmpMethod.GET) {
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), new RuntimeException("No response from device"));
}
return;
}
response = List.of(responsePdu);
@ -268,7 +276,11 @@ public class SnmpTransportService implements TbTransportService {
}
responseProcessingExecutor.execute(() -> {
processResponse(sessionContext, response, requestContext);
try {
processResponse(sessionContext, response, requestContext);
} catch (Exception e) {
transportService.errorEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), requestContext.getCommunicationSpec().getLabel(), e);
}
});
}
@ -279,7 +291,7 @@ public class SnmpTransportService implements TbTransportService {
JsonObject responseData = responseDataMappers.get(requestContext.getCommunicationSpec()).map(response, requestContext);
if (responseData.size() == 0) {
log.warn("No values in the SNMP response for device {}", sessionContext.getDeviceId());
return;
throw new IllegalArgumentException("No values in the response");
}
responseProcessor.process(responseData, requestContext, sessionContext);
@ -365,15 +377,17 @@ public class SnmpTransportService implements TbTransportService {
private static class RequestContext {
private final Integer requestId;
private final SnmpCommunicationSpec communicationSpec;
private final SnmpMethod method;
private final List<SnmpMapping> responseMappings;
private final int requestSize;
private List<PDU> responseParts;
@Builder
public RequestContext(Integer requestId, SnmpCommunicationSpec communicationSpec, List<SnmpMapping> responseMappings, int requestSize) {
public RequestContext(Integer requestId, SnmpCommunicationSpec communicationSpec, SnmpMethod method, List<SnmpMapping> responseMappings, int requestSize) {
this.requestId = requestId;
this.communicationSpec = communicationSpec;
this.method = method;
this.responseMappings = responseMappings;
this.requestSize = requestSize;
if (requestSize > 1) {

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.snmp.session;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@ -26,7 +27,9 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.profile.SnmpDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.transport.snmp.SnmpCommunicationSpec;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
@ -58,6 +61,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
private SnmpDeviceTransportConfiguration deviceTransportConfiguration;
@Getter
private final Device device;
@Getter
private final TenantId tenantId;
private final SnmpTransportContext snmpTransportContext;
@ -70,7 +75,8 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Getter
private final List<ScheduledFuture<?>> queryingTasks = new LinkedList<>();
public DeviceSessionContext(Device device, DeviceProfile deviceProfile, String token,
@Builder
public DeviceSessionContext(TenantId tenantId, Device device, DeviceProfile deviceProfile, String token,
SnmpDeviceProfileTransportConfiguration profileTransportConfiguration,
SnmpDeviceTransportConfiguration deviceTransportConfiguration,
SnmpTransportContext snmpTransportContext) throws Exception {
@ -78,6 +84,7 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
super.setDeviceId(device.getId());
super.setDeviceProfile(deviceProfile);
this.device = device;
this.tenantId = tenantId;
this.token = token;
this.snmpTransportContext = snmpTransportContext;
@ -134,7 +141,11 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onAttributeUpdate(UUID sessionId, AttributeUpdateNotificationMsg attributeUpdateNotification) {
log.trace("[{}] Received attributes update notification to device", sessionId);
snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
try {
snmpTransportContext.getSnmpTransportService().onAttributeUpdate(this, attributeUpdateNotification);
} catch (Exception e) {
snmpTransportContext.getTransportService().errorEvent(getTenantId(), getDeviceId(), SnmpCommunicationSpec.SHARED_ATTRIBUTES_SETTING.getLabel(), e);
}
}
@Override
@ -150,8 +161,12 @@ public class DeviceSessionContext extends DeviceAwareSessionContext implements S
@Override
public void onToDeviceRpcRequest(UUID sessionId, ToDeviceRpcRequestMsg toDeviceRequest) {
log.trace("[{}] Received RPC command to device", sessionId);
snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
try {
snmpTransportContext.getSnmpTransportService().onToDeviceRpcRequest(this, toDeviceRequest);
snmpTransportContext.getTransportService().process(getSessionInfo(), toDeviceRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
} catch (Exception e) {
snmpTransportContext.getTransportService().errorEvent(getTenantId(), getDeviceId(), SnmpCommunicationSpec.TO_DEVICE_RPC_REQUEST.getLabel(), e);
}
}
@Override

View File

@ -26,10 +26,11 @@ public class SnmpTestV2 {
device.start();
Map<String, String> mappings = new HashMap<>();
for (int i = 1; i <= 500; i++) {
String oid = String.format(".1.3.6.1.2.1.%s.1.52", i);
mappings.put(oid, "value_" + i);
}
// for (int i = 1; i <= 500; i++) {
// String oid = String.format(".1.3.6.1.2.1.%s.1.52", i);
// mappings.put(oid, "value_" + i);
// }
mappings.put("1.3.6.1.2.1.266.1.52", "****");
device.setUpMappings(mappings);
new Scanner(System.in).nextLine();

View File

@ -17,6 +17,9 @@ package org.thingsboard.server.common.transport;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
@ -139,6 +142,10 @@ public interface TransportService {
void reportActivity(SessionInfoProto sessionInfo);
void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error);
void errorEvent(TenantId tenantId, DeviceId deviceId, String method, Throwable error);
void deregisterSession(SessionInfoProto sessionInfo);
void log(SessionInfoProto sessionInfo, String msg);

View File

@ -22,6 +22,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
@ -49,11 +50,12 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType;
@ -817,6 +819,37 @@ public class DefaultTransportService implements TransportService {
sessionsToRemove.forEach(sessionsActivity::remove);
}
@Override
public void lifecycleEvent(TenantId tenantId, DeviceId deviceId, ComponentLifecycleEvent eventType, boolean success, Throwable error) {
ToCoreMsg msg = ToCoreMsg.newBuilder()
.setLifecycleEventMsg(TransportProtos.LifecycleEventProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setEntityIdMSB(deviceId.getId().getMostSignificantBits())
.setEntityIdLSB(deviceId.getId().getLeastSignificantBits())
.setServiceId(serviceInfoProvider.getServiceId())
.setLcEventType(eventType.name())
.setSuccess(success)
.setError(error != null ? ExceptionUtils.getStackTrace(error) : ""))
.build();
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
}
@Override
public void errorEvent(TenantId tenantId, DeviceId deviceId, String method, Throwable error) {
ToCoreMsg msg = ToCoreMsg.newBuilder()
.setErrorEventMsg(TransportProtos.ErrorEventProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setEntityIdMSB(deviceId.getId().getMostSignificantBits())
.setEntityIdLSB(deviceId.getId().getLeastSignificantBits())
.setServiceId(serviceInfoProvider.getServiceId())
.setMethod(method)
.setError(ExceptionUtils.getStackTrace(error)))
.build();
sendToCore(tenantId, deviceId, msg, deviceId.getId(), TransportServiceCallback.EMPTY);
}
@Override
public SessionMetaData registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
@ -1108,18 +1141,21 @@ public class DefaultTransportService implements TransportService {
}
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build();
sendToCore(getTenantId(sessionInfo), getDeviceId(sessionInfo), toCoreMsg, getRoutingKey(sessionInfo), callback);
}
private void sendToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, UUID routingKey, TransportServiceCallback<Void> callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg);
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, entityId, tpi.getFullTopicName(), msg);
}
TransportTbQueueCallback transportTbQueueCallback = callback != null ?
new TransportTbQueueCallback(callback) : null;
tbCoreProducerStats.incrementTotal();
StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats);
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
wrappedCallback);
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, msg), wrappedCallback);
}
private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {