TMP commit for HTTP transport

This commit is contained in:
Andrew Shvayka 2018-10-16 13:20:40 +03:00
parent 67431a044a
commit 68eabb9ec2
16 changed files with 281 additions and 99 deletions

View File

@ -61,17 +61,17 @@
<artifactId>transport</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.thingsboard.transport</groupId>-->
<!--<artifactId>http</artifactId>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.thingsboard.transport</groupId>-->
<!--<artifactId>coap</artifactId>-->
<!--<groupId>org.thingsboard.transport</groupId>-->
<!--<artifactId>coap</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>http</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>
<artifactId>dao</artifactId>
@ -542,7 +542,8 @@
<args>
<arg>-PprojectBuildDir=${project.build.directory}</arg>
<arg>-PprojectVersion=${project.version}</arg>
<arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}</arg>
<arg>-PmainJar=${project.build.directory}/${project.build.finalName}-boot.${project.packaging}
</arg>
<arg>-PpkgName=${pkg.name}</arg>
<arg>-PpkgInstallFolder=${pkg.installFolder}</arg>
<arg>-PpkgLogFolder=${pkg.unixLogFolder}</arg>

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
@ -175,7 +176,7 @@ public class LocalTransportService implements TransportService, RuleEngineTransp
}
@Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
public void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
}

View File

@ -19,15 +19,15 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<groupId>org.thingsboard.common</groupId>
<version>2.2.0-SNAPSHOT</version>
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.transport</groupId>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>http</artifactId>
<packaging>jar</packaging>
<name>Thingsboard HTTP Transport</name>
<name>Thingsboard HTTP Transport Common</name>
<url>https://thingsboard.io</url>
<properties>
@ -37,8 +37,8 @@
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>transport</artifactId>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>transport-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,64 +17,68 @@ package org.thingsboard.server.transport.http;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.transport.http.session.HttpSessionCtx;
import javax.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.function.Consumer;
/**
* @author Andrew Shvayka
*/
@RestController
@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true")
@ConditionalOnProperty(prefix = "transport.http", value = "enabled", havingValue = "true", matchIfMissing = true)
@RequestMapping("/api/v1")
@Slf4j
public class DeviceApiController {
@Value("${http.request_timeout}")
private long defaultTimeout;
@Autowired(required = false)
private SessionMsgProcessor processor;
@Autowired(required = false)
private DeviceAuthService authService;
@Autowired(required = false)
private HostRequestsQuotaService quotaService;
@Autowired
private HttpTransportContext transportContext;
@RequestMapping(value = "/{deviceToken}/attributes", method = RequestMethod.GET, produces = "application/json")
public DeferredResult<ResponseEntity> getDeviceAttributes(@PathVariable("deviceToken") String deviceToken,
@RequestParam(value = "clientKeys", required = false, defaultValue = "") String clientKeys,
@RequestParam(value = "sharedKeys", required = false, defaultValue = "") String sharedKeys,
HttpServletRequest httpRequest) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
if (quotaExceeded(httpRequest, responseWriter)) {
return responseWriter;
}
// HttpSessionCtx ctx = getHttpSessionCtx(responseWriter);
// if (ctx.login(new DeviceTokenCredentials(deviceToken))) {
// GetAttributesRequest request;
// if (StringUtils.isEmpty(clientKeys) && StringUtils.isEmpty(sharedKeys)) {
// request = new BasicGetAttributesRequest(0);
// } else {
// Set<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? new HashSet<>(Arrays.asList(clientKeys.split(","))) : null;
// Set<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? new HashSet<>(Arrays.asList(sharedKeys.split(","))) : null;
// request = new BasicGetAttributesRequest(0, clientKeySet, sharedKeySet);
// }
// process(ctx, request);
// } else {
// responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
// }
transportContext.getTransportService().process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(deviceToken).build(),
new DeviceAuthCallback(transportContext, responseWriter, sessionInfo -> {
GetAttributeRequestMsg.Builder request = GetAttributeRequestMsg.newBuilder().setRequestId(0);
List<String> clientKeySet = !StringUtils.isEmpty(clientKeys) ? Arrays.asList(clientKeys.split(",")) : null;
List<String> sharedKeySet = !StringUtils.isEmpty(sharedKeys) ? Arrays.asList(sharedKeys.split(",")) : null;
if (clientKeySet != null) {
request.addAllClientAttributeNames(clientKeySet);
}
if (sharedKeySet != null) {
request.addAllSharedAttributeNames(sharedKeySet);
}
TransportService transportService = transportContext.getTransportService();
transportService.registerSession(sessionInfo, TransportProtos.SessionType.SYNC, new HttpSessionListener(transportContext, responseWriter));
transportService.process(sessionInfo, request.build(), new SessionCloseOnErrorCallback(transportService, sessionInfo));
}));
return responseWriter;
}
@ -199,7 +203,7 @@ public class DeviceApiController {
// }
private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter) {
return getHttpSessionCtx(responseWriter, defaultTimeout);
return getHttpSessionCtx(responseWriter, transportContext.getDefaultTimeout());
}
private HttpSessionCtx getHttpSessionCtx(DeferredResult<ResponseEntity> responseWriter, long timeout) {
@ -213,7 +217,7 @@ public class DeviceApiController {
// }
private boolean quotaExceeded(HttpServletRequest request, DeferredResult<ResponseEntity> responseWriter) {
if (quotaService.isQuotaExceeded(request.getRemoteAddr())) {
if (transportContext.getQuotaService().isQuotaExceeded(request.getRemoteAddr())) {
log.warn("REST Quota exceeded for [{}] . Disconnect", request.getRemoteAddr());
responseWriter.setResult(new ResponseEntity<>(HttpStatus.BANDWIDTH_LIMIT_EXCEEDED));
return true;
@ -221,4 +225,96 @@ public class DeviceApiController {
return false;
}
private static class DeviceAuthCallback implements TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> {
private final TransportContext transportContext;
private final DeferredResult<ResponseEntity> responseWriter;
private final Consumer<SessionInfoProto> onSuccess;
DeviceAuthCallback(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter, Consumer<SessionInfoProto> onSuccess) {
this.transportContext = transportContext;
this.responseWriter = responseWriter;
this.onSuccess = onSuccess;
}
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
if (msg.hasDeviceInfo()) {
UUID sessionId = UUID.randomUUID();
DeviceInfoProto deviceInfoProto = msg.getDeviceInfo();
SessionInfoProto sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(transportContext.getNodeId())
.setTenantIdMSB(deviceInfoProto.getTenantIdMSB())
.setTenantIdLSB(deviceInfoProto.getTenantIdLSB())
.setDeviceIdMSB(deviceInfoProto.getDeviceIdMSB())
.setDeviceIdLSB(deviceInfoProto.getDeviceIdLSB())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.build();
onSuccess.accept(sessionInfo);
} else {
responseWriter.setResult(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
}
@Override
public void onError(Throwable e) {
log.warn("Failed to process request", e);
responseWriter.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
}
private static class SessionCloseOnErrorCallback implements TransportServiceCallback<Void> {
private final TransportService transportService;
private final SessionInfoProto sessionInfo;
SessionCloseOnErrorCallback(TransportService transportService, SessionInfoProto sessionInfo) {
this.transportService = transportService;
this.sessionInfo = sessionInfo;
}
@Override
public void onSuccess(Void msg) {
}
@Override
public void onError(Throwable e) {
transportService.deregisterSession(sessionInfo);
}
}
private static class HttpSessionListener implements SessionMsgListener {
private final TransportContext transportContext;
private final DeferredResult<ResponseEntity> responseWriter;
HttpSessionListener(TransportContext transportContext, DeferredResult<ResponseEntity> responseWriter) {
this.transportContext = transportContext;
this.responseWriter = responseWriter;
}
@Override
public void onGetAttributesResponse(GetAttributeResponseMsg msg) {
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
}
@Override
public void onAttributeUpdate(AttributeUpdateNotificationMsg msg) {
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
}
@Override
public void onRemoteSessionCloseCommand(SessionCloseNotificationProto sessionCloseNotification) {
}
@Override
public void onToDeviceRpcRequest(ToDeviceRpcRequestMsg toDeviceRequest) {
}
@Override
public void onToServerRpcResponse(ToServerRpcResponseMsg toServerResponse) {
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.http;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportContext;
/**
* Created by ashvayka on 04.10.18.
*/
@Slf4j
@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
@Component
public class HttpTransportContext extends TransportContext {
@Getter
@Value("${http.request_timeout}")
private long defaultTimeout;
}

View File

@ -24,6 +24,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
@ -40,48 +41,21 @@ import java.util.concurrent.Executors;
@Slf4j
@ConditionalOnProperty(prefix = "transport.mqtt", value = "enabled", havingValue = "true", matchIfMissing = true)
@Component
@Data
public class MqttTransportContext {
private final ObjectMapper mapper = new ObjectMapper();
@Autowired
private TransportService transportService;
public class MqttTransportContext extends TransportContext {
@Getter
@Autowired(required = false)
private MqttSslHandlerProvider sslHandlerProvider;
@Autowired(required = false)
private HostRequestsQuotaService quotaService;
@Getter
@Autowired
private MqttTransportAdaptor adaptor;
@Autowired
private TbNodeIdProvider nodeIdProvider;
@Getter
@Value("${transport.mqtt.netty.max_payload_size}")
private Integer maxPayloadSize;
@Getter
private SslHandler sslHandler;
@Getter
private ExecutorService executor;
@PostConstruct
public void init() {
executor = Executors.newCachedThreadPool();
}
@PreDestroy
public void stop() {
if (executor != null) {
executor.shutdownNow();
}
}
public String getNodeId() {
return nodeIdProvider.getNodeId();
}
}

View File

@ -506,7 +506,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.build();
transportService.process(sessionInfo, getSessionEventMsg(SessionEvent.OPEN), null);
transportService.registerSession(sessionInfo, this);
transportService.registerSession(sessionInfo, TransportProtos.SessionType.ASYNC, this);
checkGatewaySession();
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
}

View File

@ -121,7 +121,7 @@ public class GatewaySessionHandler {
transportService.process(deviceSessionInfo, MqttTransportHandler.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null);
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null);
transportService.registerSession(deviceSessionInfo, deviceSessionCtx);
transportService.registerSession(deviceSessionInfo, TransportProtos.SessionType.ASYNC, deviceSessionCtx);
}
future.set(devices.get(deviceName));
}

View File

@ -37,7 +37,7 @@
<modules>
<module>transport-api</module>
<module>mqtt</module>
<!--module>http</module-->
<module>http</module>
<!--module>coap</module-->
</modules>

View File

@ -0,0 +1,53 @@
package org.thingsboard.server.common.transport;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by ashvayka on 15.10.18.
*/
@Slf4j
@Data
public class TransportContext {
protected final ObjectMapper mapper = new ObjectMapper();
@Autowired
private TransportService transportService;
@Autowired
private TbNodeIdProvider nodeIdProvider;
@Autowired(required = false)
private HostRequestsQuotaService quotaService;
@Getter
private ExecutorService executor;
@PostConstruct
public void init() {
executor = Executors.newCachedThreadPool();
}
@PreDestroy
public void stop() {
if (executor != null) {
executor.shutdownNow();
}
}
public String getNodeId() {
return nodeIdProvider.getNodeId();
}
}

View File

@ -59,7 +59,7 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
void registerSession(SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener);
void deregisterSession(SessionInfoProto sessionInfo);

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -85,7 +85,7 @@ public class RemoteTransportService implements TransportService {
@Value("${kafka.transport_api.response_auto_commit_interval}")
private int autoCommitInterval;
private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
private ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<>();
@Autowired
private TbKafkaSettings kafkaSettings;
@ -159,8 +159,9 @@ public class RemoteTransportService implements TransportService {
if (toTransportMsg.hasToDeviceSessionMsg()) {
DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
SessionMsgListener listener = sessions.get(sessionId);
if (listener != null) {
SessionMetaData md = sessions.get(sessionId);
if (md != null) {
SessionMsgListener listener = md.getListener();
transportCallbackExecutor.submit(() -> {
if (toSessionMsg.hasGetAttributesResponse()) {
listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
@ -178,6 +179,9 @@ public class RemoteTransportService implements TransportService {
listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
}
});
if (md.getSessionType() == SessionType.SYNC) {
deregisterSession(md.getSessionInfo());
}
} else {
//TODO: should we notify the device actor about missed session?
log.debug("[{}] Missing session.", sessionId);
@ -311,8 +315,8 @@ public class RemoteTransportService implements TransportService {
}
@Override
public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), listener);
public void registerSession(SessionInfoProto sessionInfo, SessionType sessionType, SessionMsgListener listener) {
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, sessionType, listener));
//TODO: monitor sessions periodically: PING REQ/RESP, etc.
}

View File

@ -0,0 +1,17 @@
package org.thingsboard.server.common.transport.service;
import lombok.Data;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.gen.transport.TransportProtos;
/**
* Created by ashvayka on 15.10.18.
*/
@Data
public class SessionMetaData {
private final TransportProtos.SessionInfoProto sessionInfo;
private final TransportProtos.SessionType sessionType;
private final SessionMsgListener listener;
}

View File

@ -365,7 +365,7 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.thingsboard.transport</groupId>
<groupId>org.thingsboard.common.transport</groupId>
<artifactId>http</artifactId>
<version>${project.version}</version>
</dependency>

View File

@ -34,7 +34,6 @@
</properties>
<modules>
<module>http</module>
<!--<module>coap</module>-->
<module>mqtt-transport</module>
</modules>