sparkPlug: start add Handler
This commit is contained in:
parent
7b38623d95
commit
3865e36fc8
@ -16,14 +16,7 @@
|
|||||||
package org.thingsboard.server.transport.mqtt.session;
|
package org.thingsboard.server.transport.mqtt.session;
|
||||||
|
|
||||||
import io.netty.handler.codec.mqtt.MqttQoS;
|
import io.netty.handler.codec.mqtt.MqttQoS;
|
||||||
import org.thingsboard.server.common.data.DeviceProfile;
|
|
||||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
|
||||||
import org.thingsboard.server.common.data.TransportPayloadType;
|
|
||||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
|
|
||||||
import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration;
|
|
||||||
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
|
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
|
||||||
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilter;
|
|
||||||
import org.thingsboard.server.transport.mqtt.util.MqttTopicFilterFactory;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|||||||
@ -0,0 +1,149 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.transport.mqtt.session;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelFuture;
|
||||||
|
import io.netty.handler.codec.mqtt.MqttMessage;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.server.common.data.DeviceProfile;
|
||||||
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||||
|
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||||
|
import org.thingsboard.server.common.transport.TransportService;
|
||||||
|
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||||
|
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Created by ashvayka on 19.01.17.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class SparkplugNodeSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
|
||||||
|
|
||||||
|
private final GatewaySessionHandler parent;
|
||||||
|
private final TransportService transportService;
|
||||||
|
|
||||||
|
public SparkplugNodeSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
|
||||||
|
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
|
||||||
|
TransportService transportService) {
|
||||||
|
super(UUID.randomUUID(), mqttQoSMap);
|
||||||
|
this.parent = parent;
|
||||||
|
setSessionInfo(SessionInfoProto.newBuilder()
|
||||||
|
.setNodeId(parent.getNodeId())
|
||||||
|
.setSessionIdMSB(sessionId.getMostSignificantBits())
|
||||||
|
.setSessionIdLSB(sessionId.getLeastSignificantBits())
|
||||||
|
.setDeviceIdMSB(deviceInfo.getDeviceId().getId().getMostSignificantBits())
|
||||||
|
.setDeviceIdLSB(deviceInfo.getDeviceId().getId().getLeastSignificantBits())
|
||||||
|
.setTenantIdMSB(deviceInfo.getTenantId().getId().getMostSignificantBits())
|
||||||
|
.setTenantIdLSB(deviceInfo.getTenantId().getId().getLeastSignificantBits())
|
||||||
|
.setCustomerIdMSB(deviceInfo.getCustomerId().getId().getMostSignificantBits())
|
||||||
|
.setCustomerIdLSB(deviceInfo.getCustomerId().getId().getLeastSignificantBits())
|
||||||
|
.setDeviceName(deviceInfo.getDeviceName())
|
||||||
|
.setDeviceType(deviceInfo.getDeviceType())
|
||||||
|
.setGwSessionIdMSB(parent.getSessionId().getMostSignificantBits())
|
||||||
|
.setGwSessionIdLSB(parent.getSessionId().getLeastSignificantBits())
|
||||||
|
.setDeviceProfileIdMSB(deviceInfo.getDeviceProfileId().getId().getMostSignificantBits())
|
||||||
|
.setDeviceProfileIdLSB(deviceInfo.getDeviceProfileId().getId().getLeastSignificantBits())
|
||||||
|
.build());
|
||||||
|
setDeviceInfo(deviceInfo);
|
||||||
|
setConnected(true);
|
||||||
|
setDeviceProfile(deviceProfile);
|
||||||
|
this.transportService = transportService;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UUID getSessionId() {
|
||||||
|
return sessionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int nextMsgId() {
|
||||||
|
return parent.nextMsgId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg response) {
|
||||||
|
try {
|
||||||
|
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), response).ifPresent(parent::writeAndFlush);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
|
||||||
|
log.trace("[{}] Received attributes update notification to device", sessionId);
|
||||||
|
try {
|
||||||
|
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), notification).ifPresent(parent::writeAndFlush);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg request) {
|
||||||
|
log.trace("[{}] Received RPC command to device", sessionId);
|
||||||
|
try {
|
||||||
|
parent.getPayloadAdaptor().convertToGatewayPublish(this, getDeviceInfo().getDeviceName(), request).ifPresent(
|
||||||
|
payload -> {
|
||||||
|
ChannelFuture channelFuture = parent.writeAndFlush(payload);
|
||||||
|
if (request.getPersisted()) {
|
||||||
|
channelFuture.addListener(result -> {
|
||||||
|
if (result.cause() == null) {
|
||||||
|
if (!isAckExpected(payload)) {
|
||||||
|
transportService.process(getSessionInfo(), request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||||
|
} else if (request.getPersisted()) {
|
||||||
|
transportService.process(getSessionInfo(), request, RpcStatus.SENT, TransportServiceCallback.EMPTY);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
transportService.process(getSessionInfo(),
|
||||||
|
TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
|
||||||
|
.setRequestId(request.getRequestId()).setError("Failed to convert device RPC command to MQTT msg").build(), TransportServiceCallback.EMPTY);
|
||||||
|
log.trace("[{}] Failed to convert device attributes response to MQTT msg", sessionId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
|
||||||
|
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
|
||||||
|
parent.deregisterSession(getDeviceInfo().getDeviceName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg toServerResponse) {
|
||||||
|
// This feature is not supported in the TB IoT Gateway yet.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onDeviceDeleted(DeviceId deviceId) {
|
||||||
|
parent.onDeviceDeleted(this.getSessionInfo().getDeviceName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isAckExpected(MqttMessage message) {
|
||||||
|
return message.fixedHeader().qosLevel().value() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user