Some refactoring

This commit is contained in:
Andrew Shvayka 2018-10-05 18:52:59 +03:00
parent fd6ad413b7
commit caf38f6755
42 changed files with 805 additions and 355 deletions

View File

@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.thingsboard.transport</groupId>
<artifactId>mqtt</artifactId>
<artifactId>mqtt-common</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard</groupId>

View File

@ -43,7 +43,8 @@ public class TBKafkaConsumerTemplate<T> {
private final String topic;
@Builder
private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder, TbKafkaRequestIdExtractor<T> requestIdExtractor,
private TBKafkaConsumerTemplate(TbKafkaSettings settings, TbKafkaDecoder<T> decoder,
TbKafkaRequestIdExtractor<T> requestIdExtractor,
String clientId, String groupId, String topic,
boolean autoCommit, int autoCommitIntervalMs) {
Properties props = settings.toProps();

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -27,13 +27,12 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.Header;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
/**
* Created by ashvayka on 24.09.18.
@ -48,7 +47,7 @@ public class TBKafkaProducerTemplate<T> {
private TbKafkaEnricher<T> enricher = ((value, responseTopic, requestId) -> value);
private final TbKafkaPartitioner<T> partitioner;
private List<PartitionInfo> partitionInfoList;
private ConcurrentMap<String, List<PartitionInfo>> partitionInfoMap;
@Getter
private final String defaultTopic;
@ -78,11 +77,16 @@ public class TBKafkaProducerTemplate<T> {
log.trace("Failed to create topic: {}", e.getMessage(), e);
}
//Maybe this should not be cached, but we don't plan to change size of partitions
this.partitionInfoList = producer.partitionsFor(defaultTopic);
this.partitionInfoMap = new ConcurrentHashMap<>();
this.partitionInfoMap.putIfAbsent(defaultTopic, producer.partitionsFor(defaultTopic));
}
public T enrich(T value, String responseTopic, UUID requestId) {
return enricher.enrich(value, responseTopic, requestId);
T enrich(T value, String responseTopic, UUID requestId) {
if (enricher != null) {
return enricher.enrich(value, responseTopic, requestId);
} else {
return value;
}
}
public Future<RecordMetadata> send(String key, T value) {
@ -101,7 +105,7 @@ public class TBKafkaProducerTemplate<T> {
byte[] data = encoder.encode(value);
ProducerRecord<String, byte[]> record;
Integer partition = getPartition(topic, key, value, data);
record = new ProducerRecord<>(this.defaultTopic, partition, timestamp, key, data, headers);
record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
return producer.send(record);
}
@ -109,7 +113,7 @@ public class TBKafkaProducerTemplate<T> {
if (partitioner == null) {
return null;
} else {
return partitioner.partition(this.defaultTopic, key, value, data, partitionInfoList);
return partitioner.partition(topic, key, value, data, partitionInfoMap.computeIfAbsent(topic, producer::partitionsFor));
}
}
}

View File

@ -0,0 +1,12 @@
package org.thingsboard.server.kafka;
import java.util.function.Consumer;
/**
* Created by ashvayka on 05.10.18.
*/
public interface TbKafkaHandler<Request, Response> {
void handle(Request request, Consumer<Response> onSuccess, Consumer<Throwable> onFailure);
}

View File

@ -93,13 +93,12 @@ public class TbKafkaRequestTemplate<Request, Response> {
ConsumerRecords<String, byte[]> responses = responseTemplate.poll(Duration.ofMillis(pollInterval));
responses.forEach(response -> {
Header requestIdHeader = response.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
Response decocedResponse = null;
Response decodedResponse = null;
UUID requestId = null;
if (requestIdHeader == null) {
try {
decocedResponse = responseTemplate.decode(response);
requestId = responseTemplate.extractRequestId(decocedResponse);
decodedResponse = responseTemplate.decode(response);
requestId = responseTemplate.extractRequestId(decodedResponse);
} catch (IOException e) {
log.error("Failed to decode response", e);
}
@ -107,17 +106,17 @@ public class TbKafkaRequestTemplate<Request, Response> {
requestId = bytesToUuid(requestIdHeader.value());
}
if (requestId == null) {
log.error("[{}] Missing requestId in header and response", response);
log.error("[{}] Missing requestId in header and body", response);
} else {
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
try {
if (decocedResponse == null) {
decocedResponse = responseTemplate.decode(response);
if (decodedResponse == null) {
decodedResponse = responseTemplate.decode(response);
}
expectedResponse.future.set(decocedResponse);
expectedResponse.future.set(decodedResponse);
} catch (IOException e) {
expectedResponse.future.setException(e);
}

View File

@ -0,0 +1,173 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.kafka;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
/**
* Created by ashvayka on 25.09.18.
*/
@Slf4j
public class TbKafkaResponseTemplate<Request, Response> {
private final TBKafkaConsumerTemplate<Request> requestTemplate;
private final TBKafkaProducerTemplate<Response> responseTemplate;
private final TbKafkaHandler<Request, Response> handler;
private final ConcurrentMap<UUID, String> pendingRequests;
private final ExecutorService executor;
private final long maxPendingRequests;
private final long pollInterval;
private volatile boolean stopped = false;
@Builder
public TbKafkaResponseTemplate(TBKafkaConsumerTemplate<Request> requestTemplate,
TBKafkaProducerTemplate<Response> responseTemplate,
TbKafkaHandler<Request, Response> handler,
long pollInterval,
long maxPendingRequests,
ExecutorService executor) {
this.requestTemplate = requestTemplate;
this.responseTemplate = responseTemplate;
this.handler = handler;
this.pendingRequests = new ConcurrentHashMap<>();
this.maxPendingRequests = maxPendingRequests;
this.pollInterval = pollInterval;
this.executor = executor;
}
public void init() {
this.responseTemplate.init();
requestTemplate.subscribe();
executor.submit(() -> {
long nextCleanupMs = 0L;
while (!stopped) {
ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
requests.forEach(request -> {
Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);
return;
}
UUID requestId = bytesToUuid(requestIdHeader.value());
if (requestId == null) {
log.error("[{}] Missing requestId in header and body", request);
return;
}
Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
if (responseTopicHeader == null) {
log.error("[{}] Missing response topic in header", request);
return;
}
String responseTopic = bytesToUuid(responseTopicHeader.value());
if (requestId == null) {
log.error("[{}] Missing requestId in header and body", request);
return;
}
Request decodedRequest = null;
String responseTopic = null;
try {
if (decodedRequest == null) {
decodedRequest = requestTemplate.decode(request);
}
executor.submit(() -> {
handler.handle(decodedRequest, );
});
} catch (IOException e) {
expectedRequest.future.setException(e);
}
});
}
});
}
public void stop() {
stopped = true;
}
public ListenableFuture<Response> post(String key, Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
List<Header> headers = new ArrayList<>(2);
headers.add(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)));
headers.add(new RecordHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic())));
SettableFuture<Response> future = SettableFuture.create();
pendingRequests.putIfAbsent(requestId, new ResponseMetaData<>(tickTs + maxRequestTimeout, future));
request = requestTemplate.enrich(request, responseTemplate.getTopic(), requestId);
requestTemplate.send(key, request, headers);
return future;
}
private byte[] uuidToBytes(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(16);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return buf.array();
}
private static UUID bytesToUuid(byte[] bytes) {
ByteBuffer bb = ByteBuffer.wrap(bytes);
long firstLong = bb.getLong();
long secondLong = bb.getLong();
return new UUID(firstLong, secondLong);
}
private byte[] stringToBytes(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
private String bytesToString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}
private static class ResponseMetaData<T> {
private final long expTime;
private final SettableFuture<T> future;
ResponseMetaData(long ts, SettableFuture<T> future) {
this.expTime = ts;
this.future = future;
}
}
}

View File

@ -78,6 +78,11 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos;

View File

@ -1,3 +1,18 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport;
/**
@ -6,6 +21,6 @@ package org.thingsboard.server.common.transport;
public interface TransportServiceCallback<T> {
void onSuccess(T msg);
void onError(Exception e);
void onError(Throwable e);
}

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryCleaner;
@Component
@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantIntervalRegistryCleaner extends IntervalRegistryCleaner {
public TenantIntervalRegistryCleaner(TenantMsgsIntervalRegistry intervalRegistry,

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.common.transport.quota.tenant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.IntervalRegistryLogger;
@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantIntervalRegistryLogger extends IntervalRegistryLogger {
private final long logIntervalMin;

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.inmemory.KeyBasedIntervalRegistry;
@Component
@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantMsgsIntervalRegistry extends KeyBasedIntervalRegistry {
public TenantMsgsIntervalRegistry(@Value("${quota.rule.tenant.intervalMs}") long intervalDurationMs,

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.AbstractQuotaService;
@Component
@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantQuotaService extends AbstractQuotaService {
public TenantQuotaService(TenantMsgsIntervalRegistry requestRegistry, TenantRequestLimitPolicy requestsPolicy,

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.common.transport.quota.tenant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.quota.RequestLimitPolicy;
@Component
@ConditionalOnProperty(prefix = "quota.rule.tenant", value = "enabled", havingValue = "true", matchIfMissing = false)
public class TenantRequestLimitPolicy extends RequestLimitPolicy {
public TenantRequestLimitPolicy(@Value("${quota.rule.tenant.limit}") long limit) {

View File

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

View File

@ -95,3 +95,17 @@ message ValidateDeviceTokenResponseMsg {
DeviceInfoProto deviceInfo = 1;
}
/**
* Main messages;
*/
message TransportToRuleEngineMsg {
}
message TransportApiRequestMsg {
ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
}
message TransportApiResponseMsg {
ValidateDeviceTokenResponseMsg validateTokenResponseMsg = 1;
}

View File

@ -17,7 +17,11 @@ package org.thingsboard.server.common.transport.quota;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.common.transport.quota.host.*;
import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryCleaner;
import org.thingsboard.server.common.transport.quota.host.HostIntervalRegistryLogger;
import org.thingsboard.server.common.transport.quota.host.HostRequestIntervalRegistry;
import org.thingsboard.server.common.transport.quota.host.HostRequestLimitPolicy;
import org.thingsboard.server.common.transport.quota.host.HostRequestsQuotaService;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

View File

@ -366,7 +366,7 @@
</dependency>
<dependency>
<groupId>org.thingsboard.transport</groupId>
<artifactId>mqtt</artifactId>
<artifactId>mqtt-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>

View File

@ -24,10 +24,10 @@
<artifactId>transport</artifactId>
</parent>
<groupId>org.thingsboard.transport</groupId>
<artifactId>mqtt</artifactId>
<artifactId>mqtt-common</artifactId>
<packaging>jar</packaging>
<name>Thingsboard MQTT Transport</name>
<name>Thingsboard MQTT Transport Common</name>
<url>https://thingsboard.io</url>
<properties>

View File

@ -395,7 +395,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
@Override
public void onError(Exception e) {
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
@ -521,6 +521,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
if (deviceSessionCtx.isConnected()) {
transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
}
}
}

View File

@ -57,12 +57,12 @@ public class DeviceSessionCtx extends MqttDeviceAwareSessionContext {
@Override
public void onMsg(SessionActorToAdaptorMsg msg) throws SessionException {
try {
adaptor.convertToAdaptorMsg(this, msg).ifPresent(this::pushToNetwork);
} catch (AdaptorException e) {
//TODO: close channel with disconnect;
logAndWrap(e);
}
// try {
// adaptor.convertToAdaptorMsg(this, msg).ifPresent(this::pushToNetwork);
// } catch (AdaptorException e) {
// //TODO: close channel with disconnect;
// logAndWrap(e);
// }
}
private void logAndWrap(AdaptorException e) throws SessionException {

View File

@ -135,7 +135,7 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
private MqttMessage createMqttPublishMsg(String topic, GetAttributesResponse response) {
JsonObject result = new JsonObject();
result.addProperty("id", response.getRequestId());
result.addProperty(DEVICE_PROPERTY, device.getName());
// result.addProperty(DEVICE_PROPERTY, device.getName());
Optional<AttributesKVMsg> responseData = response.getData();
if (responseData.isPresent()) {
AttributesKVMsg msg = responseData.get();
@ -183,14 +183,14 @@ public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext {
private MqttMessage createMqttPublishMsg(String topic, AttributesKVMsg data) {
JsonObject result = new JsonObject();
result.addProperty(DEVICE_PROPERTY, device.getName());
// result.addProperty(DEVICE_PROPERTY, device.getName());
result.add("data", JsonConverter.toJson(data, false));
return createMqttPublishMsg(topic, result);
}
private MqttMessage createMqttPublishMsg(String topic, ToDeviceRpcRequestMsg data) {
JsonObject result = new JsonObject();
result.addProperty(DEVICE_PROPERTY, device.getName());
// result.addProperty(DEVICE_PROPERTY, device.getName());
result.add("data", JsonConverter.toJson(data, true));
return createMqttPublishMsg(topic, result);
}

View File

@ -0,0 +1,284 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
/**
* Created by ashvayka on 19.01.17.
*/
@Slf4j
public class GatewaySessionCtx {
private static final String DEFAULT_DEVICE_TYPE = "default";
public static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
public static final String DEVICE_PROPERTY = "device";
// private final Device gateway;
// private final SessionId gatewaySessionId;
// private final SessionMsgProcessor processor;
// private final DeviceService deviceService;
// private final DeviceAuthService authService;
// private final RelationService relationService;
// private final Map<String, GatewayDeviceSessionCtx> devices;
// private final ConcurrentMap<String, Integer> mqttQoSMap;
private ChannelHandlerContext channel;
// public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
// this.processor = processor;
// this.deviceService = deviceService;
// this.authService = authService;
// this.relationService = relationService;
// this.gateway = gatewaySessionCtx.getDevice();
// this.gatewaySessionId = gatewaySessionCtx.getSessionId();
// this.devices = new HashMap<>();
// this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
// }
public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
}
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json));
String deviceType = getDeviceType(json);
onDeviceConnect(deviceName, deviceType);
ack(msg);
}
private void onDeviceConnect(String deviceName, String deviceType) {
// if (!devices.containsKey(deviceName)) {
// Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
// if (device == null) {
// device = new Device();
// device.setTenantId(gateway.getTenantId());
// device.setName(deviceName);
// device.setType(deviceType);
// device.setCustomerId(gateway.getCustomerId());
// device = deviceService.saveDevice(device);
// relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
// processor.onDeviceAdded(device);
// }
// GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
// devices.put(deviceName, ctx);
// log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
// processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
// }
}
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
// String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
// GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
// if (deviceSessionCtx != null) {
// processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
// deviceSessionCtx.setClosed(true);
// log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
// } else {
// log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
// }
// ack(msg);
}
public void onGatewayDisconnect() {
// devices.forEach((k, v) -> {
// processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
// });
}
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
// int requestId = mqttMsg.variableHeader().messageId();
// if (json.isJsonObject()) {
// JsonObject jsonObj = json.getAsJsonObject();
// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
// String deviceName = checkDeviceConnected(deviceEntry.getKey());
// if (!deviceEntry.getValue().isJsonArray()) {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
// BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
// JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
// for (JsonElement element : deviceData) {
// JsonConverter.parseWithTs(request, element.getAsJsonObject());
// }
// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
// }
// } else {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
}
public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
// if (json.isJsonObject()) {
// JsonObject jsonObj = json.getAsJsonObject();
// String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString());
// Integer requestId = jsonObj.get("id").getAsInt();
// String data = jsonObj.get("data").toString();
// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
// ack(mqttMsg);
// } else {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
}
public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
// JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
// int requestId = mqttMsg.variableHeader().messageId();
// if (json.isJsonObject()) {
// JsonObject jsonObj = json.getAsJsonObject();
// for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
// String deviceName = checkDeviceConnected(deviceEntry.getKey());
// if (!deviceEntry.getValue().isJsonObject()) {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
// long ts = System.currentTimeMillis();
// BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
// JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
// request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
// }
// } else {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
}
public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException {
// JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload());
// if (json.isJsonObject()) {
// JsonObject jsonObj = json.getAsJsonObject();
// int requestId = jsonObj.get("id").getAsInt();
// String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
// boolean clientScope = jsonObj.get("client").getAsBoolean();
// Set<String> keys;
// if (jsonObj.has("key")) {
// keys = Collections.singleton(jsonObj.get("key").getAsString());
// } else {
// JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
// keys = new HashSet<>();
// for (JsonElement keyObj : keysArray) {
// keys.add(keyObj.getAsString());
// }
// }
//
// BasicGetAttributesRequest request;
// if (clientScope) {
// request = new BasicGetAttributesRequest(requestId, keys, null);
// } else {
// request = new BasicGetAttributesRequest(requestId, null, keys);
// }
// GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
// processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
// new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
// ack(msg);
// } else {
// throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
// }
}
private String checkDeviceConnected(String deviceName) {
// if (!devices.containsKey(deviceName)) {
// log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName);
// onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
// }
// return deviceName;
return null;
}
private String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!");
} else {
return deviceName;
}
}
private String getDeviceName(JsonElement json) throws AdaptorException {
return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
}
private String getDeviceType(JsonElement json) throws AdaptorException {
JsonElement type = json.getAsJsonObject().get("type");
return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
}
private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
// return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
return null;
}
protected SessionMsgProcessor getProcessor() {
// return processor;
return null;
}
DeviceAuthService getAuthService() {
// return authService;
return null;
}
public void setChannel(ChannelHandlerContext channel) {
this.channel = channel;
}
private void ack(MqttPublishMessage msg) {
if (msg.variableHeader().messageId() > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
}
}
void writeAndFlush(MqttMessage mqttMessage) {
channel.writeAndFlush(mqttMessage);
}
}

View File

@ -32,12 +32,12 @@ public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionCo
private final ConcurrentMap<String, Integer> mqttQoSMap;
public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, ConcurrentMap<String, Integer> mqttQoSMap) {
super(processor, authService);
super();
this.mqttQoSMap = mqttQoSMap;
}
public MqttDeviceAwareSessionContext(SessionMsgProcessor processor, DeviceAuthService authService, Device device, ConcurrentMap<String, Integer> mqttQoSMap) {
super(processor, authService, device);
super();
this.mqttQoSMap = mqttQoSMap;
}

View File

@ -41,32 +41,12 @@
<artifactId>transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<groupId>org.thingsboard.transport</groupId>
<artifactId>mqtt-common</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<groupId>org.thingsboard.common</groupId>
<artifactId>queue</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@ -19,14 +19,13 @@ import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.Arrays;
@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server"})
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})
public class ThingsboardMqttTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";

View File

@ -0,0 +1,45 @@
package org.thingsboard.server.mqtt.service;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nullable;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
/**
* Created by ashvayka on 05.10.18.
*/
public class AsyncCallbackTemplate {
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure) {
withCallback(future, onSuccess, onFailure, null);
}
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure, Executor executor) {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T result) {
try {
onSuccess.accept(result);
} catch (Throwable th) {
onFailure(th);
}
}
@Override
public void onFailure(Throwable t) {
onFailure.accept(t);
}
};
if (executor != null) {
Futures.addCallback(future, callback, executor);
} else {
Futures.addCallback(future, callback);
}
}
}

View File

@ -0,0 +1,110 @@
package org.thingsboard.server.mqtt.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.transport.mqtt.MqttTransportContext;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by ashvayka on 05.10.18.
*/
@Service
public class MqttTransportService implements TransportService {
@Value("${kafka.rule-engine.topic}")
private String ruleEngineTopic;
@Value("${kafka.transport-api.requests-topic}")
private String transportApiRequestsTopic;
@Value("${kafka.transport-api.responses-topic}")
private String transportApiResponsesTopic;
@Value("${kafka.transport-api.max_pending_requests}")
private long maxPendingRequests;
@Value("${kafka.transport-api.max_requests_timeout}")
private long maxRequestsTimeout;
@Value("${kafka.transport-api.response_poll_interval}")
private int responsePollDuration;
@Value("${kafka.transport-api.response_auto_commit_interval}")
private int autoCommitInterval;
@Autowired
private TbKafkaSettings kafkaSettings;
//We use this to get the node id. We should replace this with a component that provides the node id.
@Autowired
private MqttTransportContext transportContext;
private ExecutorService transportCallbackExecutor;
private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
@PostConstruct
public void init() {
this.transportCallbackExecutor = Executors.newCachedThreadPool();
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaProducerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.defaultTopic(transportApiRequestsTopic);
requestBuilder.encoder(new TransportApiRequestEncoder());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaConsumerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
responseBuilder.clientId(transportContext.getNodeId());
responseBuilder.groupId("transport-node");
responseBuilder.autoCommit(true);
responseBuilder.autoCommitIntervalMs(autoCommitInterval);
responseBuilder.decoder(new TransportApiResponseDecoder());
TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder
<TransportApiRequestMsg, TransportApiResponseMsg> builder = TbKafkaRequestTemplate.builder();
builder.requestTemplate(requestBuilder.build());
builder.responseTemplate(responseBuilder.build());
builder.maxPendingRequests(maxPendingRequests);
builder.maxRequestTimeout(maxRequestsTimeout);
builder.pollInterval(responsePollDuration);
transportApiTemplate = builder.build();
transportApiTemplate.init();
}
@PreDestroy
public void destroy() {
if (transportApiTemplate != null) {
transportApiTemplate.stop();
}
if (transportCallbackExecutor != null) {
transportCallbackExecutor.shutdownNow();
}
}
@Override
public void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<ValidateDeviceTokenResponseMsg> callback) {
AsyncCallbackTemplate.withCallback(transportApiTemplate.post(msg.getToken(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()),
response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
}
@Override
public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
}
@Override
public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
}
@Override
public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
}
}

View File

@ -0,0 +1,14 @@
package org.thingsboard.server.mqtt.service;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.kafka.TbKafkaEncoder;
/**
* Created by ashvayka on 05.10.18.
*/
public class TransportApiRequestEncoder implements TbKafkaEncoder<TransportApiRequestMsg> {
@Override
public byte[] encode(TransportApiRequestMsg value) {
return value.toByteArray();
}
}

View File

@ -0,0 +1,16 @@
package org.thingsboard.server.mqtt.service;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.kafka.TbKafkaDecoder;
import java.io.IOException;
/**
* Created by ashvayka on 05.10.18.
*/
public class TransportApiResponseDecoder implements TbKafkaDecoder<TransportApiResponseMsg> {
@Override
public TransportApiResponseMsg decode(byte[] data) throws IOException {
return TransportApiResponseMsg.parseFrom(data);
}
}

View File

@ -44,6 +44,27 @@ mqtt:
# Type of the key store
key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
#Quota parameters
quota:
host:
# Max allowed number of API requests in interval for single host
limit: "${QUOTA_HOST_LIMIT:10000}"
# Interval duration
intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
# Maximum silence duration for host after which Host removed from QuotaService. Must be bigger than intervalMs
ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
# Interval for scheduled task that cleans expired records. TTL is used for expiring
cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
# Enable Host API Limits
enabled: "${QUOTA_HOST_ENABLED:false}"
# Array of whitelist hosts
whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
# Array of blacklist hosts
blacklist: "${QUOTA_HOST_BLACKLIST:}"
log:
topSize: 10
intervalMin: 2
kafka:
enabled: true
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
@ -52,6 +73,13 @@ kafka:
batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
topic:
telemetry: "${TB_TELEMETRY_TOPIC:tb.transport.telemetry}"
requests: "${TB_TELEMETRY_TOPIC:tb.transport.requests}"
transport-api:
requests-topic: "${TB_TRANSPORT_API_REQUEST_TOPIC:tb.transport.api.requests}"
responses-topic: "${TB_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
max_pending_requests: "${TB_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
max_requests_timeout: "${TB_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
response_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
# Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
rule-engine:
topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"

View File

@ -1,280 +0,0 @@
/**
* Copyright © 2016-2018 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt.session;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.SessionId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.msg.core.*;
import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
import org.thingsboard.server.common.transport.SessionMsgProcessor;
import org.thingsboard.server.common.transport.adaptor.AdaptorException;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.transport.mqtt.MqttTransportHandler;
import org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.transport.mqtt.adaptors.JsonMqttAdaptor.validateJsonPayload;
/**
* Created by ashvayka on 19.01.17.
*/
@Slf4j
public class GatewaySessionCtx {
private static final String DEFAULT_DEVICE_TYPE = "default";
public static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
public static final String DEVICE_PROPERTY = "device";
private final Device gateway;
private final SessionId gatewaySessionId;
private final SessionMsgProcessor processor;
private final DeviceService deviceService;
private final DeviceAuthService authService;
private final RelationService relationService;
private final Map<String, GatewayDeviceSessionCtx> devices;
private final ConcurrentMap<String, Integer> mqttQoSMap;
private ChannelHandlerContext channel;
public GatewaySessionCtx(SessionMsgProcessor processor, DeviceService deviceService, DeviceAuthService authService, RelationService relationService, DeviceSessionCtx gatewaySessionCtx) {
this.processor = processor;
this.deviceService = deviceService;
this.authService = authService;
this.relationService = relationService;
this.gateway = gatewaySessionCtx.getDevice();
this.gatewaySessionId = gatewaySessionCtx.getSessionId();
this.devices = new HashMap<>();
this.mqttQoSMap = gatewaySessionCtx.getMqttQoSMap();
}
public GatewaySessionCtx(DeviceSessionCtx deviceSessionCtx) {
}
public void onDeviceConnect(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = getJson(msg);
String deviceName = checkDeviceName(getDeviceName(json));
String deviceType = getDeviceType(json);
onDeviceConnect(deviceName, deviceType);
ack(msg);
}
private void onDeviceConnect(String deviceName, String deviceType) {
if (!devices.containsKey(deviceName)) {
Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), deviceName);
if (device == null) {
device = new Device();
device.setTenantId(gateway.getTenantId());
device.setName(deviceName);
device.setType(deviceType);
device.setCustomerId(gateway.getCustomerId());
device = deviceService.saveDevice(device);
relationService.saveRelationAsync(new EntityRelation(gateway.getId(), device.getId(), "Created"));
processor.onDeviceAdded(device);
}
GatewayDeviceSessionCtx ctx = new GatewayDeviceSessionCtx(this, device, mqttQoSMap);
devices.put(deviceName, ctx);
log.debug("[{}] Added device [{}] to the gateway session", gatewaySessionId, deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new AttributesSubscribeMsg())));
processor.process(new BasicTransportToDeviceSessionActorMsg(device, new BasicAdaptorToSessionActorMsg(ctx, new RpcSubscribeMsg())));
}
}
public void onDeviceDisconnect(MqttPublishMessage msg) throws AdaptorException {
String deviceName = checkDeviceName(getDeviceName(getJson(msg)));
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
if (deviceSessionCtx != null) {
processor.process(SessionCloseMsg.onDisconnect(deviceSessionCtx.getSessionId()));
deviceSessionCtx.setClosed(true);
log.debug("[{}] Removed device [{}] from the gateway session", gatewaySessionId, deviceName);
} else {
log.debug("[{}] Device [{}] was already removed from the gateway session", gatewaySessionId, deviceName);
}
ack(msg);
}
public void onGatewayDisconnect() {
devices.forEach((k, v) -> {
processor.process(SessionCloseMsg.onDisconnect(v.getSessionId()));
});
}
public void onDeviceTelemetry(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
int requestId = mqttMsg.variableHeader().messageId();
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = checkDeviceConnected(deviceEntry.getKey());
if (!deviceEntry.getValue().isJsonArray()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
BasicTelemetryUploadRequest request = new BasicTelemetryUploadRequest(requestId);
JsonArray deviceData = deviceEntry.getValue().getAsJsonArray();
for (JsonElement element : deviceData) {
JsonConverter.parseWithTs(request, element.getAsJsonObject());
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public void onDeviceRpcResponse(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
String deviceName = checkDeviceConnected(jsonObj.get(DEVICE_PROPERTY).getAsString());
Integer requestId = jsonObj.get("id").getAsInt();
String data = jsonObj.get("data").toString();
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, new ToDeviceRpcResponseMsg(requestId, data))));
ack(mqttMsg);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public void onDeviceAttributes(MqttPublishMessage mqttMsg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, mqttMsg.payload());
int requestId = mqttMsg.variableHeader().messageId();
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
String deviceName = checkDeviceConnected(deviceEntry.getKey());
if (!deviceEntry.getValue().isJsonObject()) {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
long ts = System.currentTimeMillis();
BasicAttributesUpdateRequest request = new BasicAttributesUpdateRequest(requestId);
JsonObject deviceData = deviceEntry.getValue().getAsJsonObject();
request.add(JsonConverter.parseValues(deviceData).stream().map(kv -> new BaseAttributeKvEntry(kv, ts)).collect(Collectors.toList()));
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
}
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
public void onDeviceAttributesRequest(MqttPublishMessage msg) throws AdaptorException {
JsonElement json = validateJsonPayload(gatewaySessionId, msg.payload());
if (json.isJsonObject()) {
JsonObject jsonObj = json.getAsJsonObject();
int requestId = jsonObj.get("id").getAsInt();
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
boolean clientScope = jsonObj.get("client").getAsBoolean();
Set<String> keys;
if (jsonObj.has("key")) {
keys = Collections.singleton(jsonObj.get("key").getAsString());
} else {
JsonArray keysArray = jsonObj.get("keys").getAsJsonArray();
keys = new HashSet<>();
for (JsonElement keyObj : keysArray) {
keys.add(keyObj.getAsString());
}
}
BasicGetAttributesRequest request;
if (clientScope) {
request = new BasicGetAttributesRequest(requestId, keys, null);
} else {
request = new BasicGetAttributesRequest(requestId, null, keys);
}
GatewayDeviceSessionCtx deviceSessionCtx = devices.get(deviceName);
processor.process(new BasicTransportToDeviceSessionActorMsg(deviceSessionCtx.getDevice(),
new BasicAdaptorToSessionActorMsg(deviceSessionCtx, request)));
ack(msg);
} else {
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
}
}
private String checkDeviceConnected(String deviceName) {
if (!devices.containsKey(deviceName)) {
log.debug("[{}] Missing device [{}] for the gateway session", gatewaySessionId, deviceName);
onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
}
return deviceName;
}
private String checkDeviceName(String deviceName) {
if (StringUtils.isEmpty(deviceName)) {
throw new RuntimeException("Device name is empty!");
} else {
return deviceName;
}
}
private String getDeviceName(JsonElement json) throws AdaptorException {
return json.getAsJsonObject().get(DEVICE_PROPERTY).getAsString();
}
private String getDeviceType(JsonElement json) throws AdaptorException {
JsonElement type = json.getAsJsonObject().get("type");
return type == null || type instanceof JsonNull ? DEFAULT_DEVICE_TYPE : type.getAsString();
}
private JsonElement getJson(MqttPublishMessage mqttMsg) throws AdaptorException {
return JsonMqttAdaptor.validateJsonPayload(gatewaySessionId, mqttMsg.payload());
}
protected SessionMsgProcessor getProcessor() {
return processor;
}
DeviceAuthService getAuthService() {
return authService;
}
public void setChannel(ChannelHandlerContext channel) {
this.channel = channel;
}
private void ack(MqttPublishMessage msg) {
if (msg.variableHeader().messageId() > 0) {
writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(msg.variableHeader().messageId()));
}
}
void writeAndFlush(MqttMessage mqttMessage) {
channel.writeAndFlush(mqttMessage);
}
}

View File

@ -37,7 +37,7 @@
<modules>
<module>http</module>
<module>coap</module>
<module>mqtt</module>
<module>mqtt-common</module>
<module>mqtt-transport</module>
</modules>