Merge hotfix 3.6
This commit is contained in:
commit
386e635bb9
@ -20,12 +20,16 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.server.cache.ota.OtaPackageDataCache;
|
import org.thingsboard.server.cache.ota.OtaPackageDataCache;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.ApiUsageState;
|
import org.thingsboard.server.common.data.ApiUsageState;
|
||||||
@ -104,10 +108,11 @@ import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
|||||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||||
import org.thingsboard.server.service.resource.TbResourceService;
|
import org.thingsboard.server.service.resource.TbResourceService;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
@ -144,12 +149,28 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
private final OtaPackageDataCache otaPackageDataCache;
|
private final OtaPackageDataCache otaPackageDataCache;
|
||||||
private final QueueService queueService;
|
private final QueueService queueService;
|
||||||
|
|
||||||
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ReentrantLock> deviceCreationLocks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
|
||||||
|
|
||||||
|
@Value("${queue.transport_api.max_core_handler_threads:16}")
|
||||||
|
private int maxCoreHandlerThreads;
|
||||||
|
|
||||||
|
ListeningExecutorService handlerExecutor;
|
||||||
|
|
||||||
private static boolean checkIsMqttCredentials(DeviceCredentials credentials) {
|
private static boolean checkIsMqttCredentials(DeviceCredentials credentials) {
|
||||||
return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType());
|
return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
handlerExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(maxCoreHandlerThreads, "transport-api-service-core-handler"));
|
||||||
|
}
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
if (handlerExecutor != null) {
|
||||||
|
handlerExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
|
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
|
||||||
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
|
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
|
||||||
@ -157,16 +178,19 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
|
|
||||||
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
|
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
|
||||||
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
|
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
|
||||||
result = validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN);
|
final String token = msg.getToken();
|
||||||
|
result = handlerExecutor.submit(() -> validateCredentials(token, DeviceCredentialsType.ACCESS_TOKEN));
|
||||||
} else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) {
|
} else if (transportApiRequestMsg.hasValidateBasicMqttCredRequestMsg()) {
|
||||||
TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg();
|
TransportProtos.ValidateBasicMqttCredRequestMsg msg = transportApiRequestMsg.getValidateBasicMqttCredRequestMsg();
|
||||||
result = validateCredentials(msg);
|
result = handlerExecutor.submit(() -> validateCredentials(msg));
|
||||||
} else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
|
} else if (transportApiRequestMsg.hasValidateX509CertRequestMsg()) {
|
||||||
ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
|
ValidateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateX509CertRequestMsg();
|
||||||
result = validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE);
|
final String hash = msg.getHash();
|
||||||
|
result = handlerExecutor.submit(() -> validateCredentials(hash, DeviceCredentialsType.X509_CERTIFICATE));
|
||||||
} else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) {
|
} else if (transportApiRequestMsg.hasValidateOrCreateX509CertRequestMsg()) {
|
||||||
TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg();
|
TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg msg = transportApiRequestMsg.getValidateOrCreateX509CertRequestMsg();
|
||||||
result = validateOrCreateDeviceX509Certificate(msg.getCertificateChain());
|
final String certChain = msg.getCertificateChain();
|
||||||
|
result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain));
|
||||||
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
|
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
|
||||||
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
|
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
|
||||||
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
|
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
|
||||||
@ -175,7 +199,8 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
result = handle(transportApiRequestMsg.getLwM2MRequestMsg());
|
result = handle(transportApiRequestMsg.getLwM2MRequestMsg());
|
||||||
} else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) {
|
} else if (transportApiRequestMsg.hasValidateDeviceLwM2MCredentialsRequestMsg()) {
|
||||||
ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg();
|
ValidateDeviceLwM2MCredentialsRequestMsg msg = transportApiRequestMsg.getValidateDeviceLwM2MCredentialsRequestMsg();
|
||||||
result = validateCredentials(msg.getCredentialsId(), DeviceCredentialsType.LWM2M_CREDENTIALS);
|
final String credentialsId = msg.getCredentialsId();
|
||||||
|
result = handlerExecutor.submit(() -> validateCredentials(credentialsId, DeviceCredentialsType.LWM2M_CREDENTIALS));
|
||||||
} else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) {
|
} else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) {
|
||||||
result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg());
|
result = handle(transportApiRequestMsg.getProvisionDeviceRequestMsg());
|
||||||
} else if (transportApiRequestMsg.hasResourceRequestMsg()) {
|
} else if (transportApiRequestMsg.hasResourceRequestMsg()) {
|
||||||
@ -197,24 +222,24 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
MoreExecutors.directExecutor());
|
MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TransportApiResponseMsg> validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
|
private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
|
||||||
//TODO: Make async and enable caching
|
//TODO: Make async and enable caching
|
||||||
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
|
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
|
||||||
if (credentials != null && credentials.getCredentialsType() == credentialsType) {
|
if (credentials != null && credentials.getCredentialsType() == credentialsType) {
|
||||||
return getDeviceInfo(credentials);
|
return getDeviceInfo(credentials);
|
||||||
} else {
|
} else {
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TransportApiResponseMsg> validateCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) {
|
private TransportApiResponseMsg validateCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) {
|
||||||
DeviceCredentials credentials;
|
DeviceCredentials credentials;
|
||||||
if (StringUtils.isEmpty(mqtt.getUserName())) {
|
if (StringUtils.isEmpty(mqtt.getUserName())) {
|
||||||
credentials = checkMqttCredentials(mqtt, EncryptionUtil.getSha3Hash(mqtt.getClientId()));
|
credentials = checkMqttCredentials(mqtt, EncryptionUtil.getSha3Hash(mqtt.getClientId()));
|
||||||
if (credentials != null) {
|
if (credentials != null) {
|
||||||
return getDeviceInfo(credentials);
|
return getDeviceInfo(credentials);
|
||||||
} else {
|
} else {
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(
|
credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(
|
||||||
@ -224,7 +249,7 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
if (VALID.equals(validationResult)) {
|
if (VALID.equals(validationResult)) {
|
||||||
return getDeviceInfo(credentials);
|
return getDeviceInfo(credentials);
|
||||||
} else if (PASSWORD_MISMATCH.equals(validationResult)) {
|
} else if (PASSWORD_MISMATCH.equals(validationResult)) {
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
} else {
|
} else {
|
||||||
return validateUserNameCredentials(mqtt);
|
return validateUserNameCredentials(mqtt);
|
||||||
}
|
}
|
||||||
@ -234,7 +259,7 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ListenableFuture<TransportApiResponseMsg> validateOrCreateDeviceX509Certificate(String certificateChain) {
|
protected TransportApiResponseMsg validateOrCreateDeviceX509Certificate(String certificateChain) {
|
||||||
List<String> chain = X509_CERTIFICATE_TRIM_CHAIN_PATTERN.matcher(certificateChain).results().map(match ->
|
List<String> chain = X509_CERTIFICATE_TRIM_CHAIN_PATTERN.matcher(certificateChain).results().map(match ->
|
||||||
EncryptionUtil.certTrimNewLines(match.group())).collect(Collectors.toList());
|
EncryptionUtil.certTrimNewLines(match.group())).collect(Collectors.toList());
|
||||||
for (String certificateValue : chain) {
|
for (String certificateValue : chain) {
|
||||||
@ -254,16 +279,16 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
}
|
}
|
||||||
} catch (ProvisionFailedException e) {
|
} catch (ProvisionFailedException e) {
|
||||||
log.debug("[{}][{}] Failed to provision device with cert chain: {}", deviceProfile.getTenantId(), deviceProfile.getId(), provisionRequest, e);
|
log.debug("[{}][{}] Failed to provision device with cert chain: {}", deviceProfile.getTenantId(), deviceProfile.getId(), provisionRequest, e);
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
} else if (deviceProfile != null) {
|
} else if (deviceProfile != null) {
|
||||||
log.warn("[{}][{}] Device Profile provision configuration mismatched: expected {}, actual {}", deviceProfile.getTenantId(), deviceProfile.getId(), DeviceProfileProvisionType.X509_CERTIFICATE_CHAIN, deviceProfile.getProvisionType());
|
log.warn("[{}][{}] Device Profile provision configuration mismatched: expected {}, actual {}", deviceProfile.getTenantId(), deviceProfile.getId(), DeviceProfileProvisionType.X509_CERTIFICATE_CHAIN, deviceProfile.getProvisionType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TransportApiResponseMsg> validateUserNameCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) {
|
private TransportApiResponseMsg validateUserNameCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg mqtt) {
|
||||||
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(mqtt.getUserName());
|
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(mqtt.getUserName());
|
||||||
if (credentials != null) {
|
if (credentials != null) {
|
||||||
switch (credentials.getCredentialsType()) {
|
switch (credentials.getCredentialsType()) {
|
||||||
@ -273,11 +298,11 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
if (VALID.equals(validateMqttCredentials(mqtt, credentials))) {
|
if (VALID.equals(validateMqttCredentials(mqtt, credentials))) {
|
||||||
return getDeviceInfo(credentials);
|
return getDeviceInfo(credentials);
|
||||||
} else {
|
} else {
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return getEmptyTransportApiResponseFuture();
|
return getEmptyTransportApiResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
private DeviceCredentials checkMqttCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg clientCred, String credId) {
|
private DeviceCredentials checkMqttCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg clientCred, String credId) {
|
||||||
@ -518,31 +543,30 @@ public class DefaultTransportApiService implements TransportApiService {
|
|||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceCredentials credentials) {
|
TransportApiResponseMsg getDeviceInfo(DeviceCredentials credentials) {
|
||||||
return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, credentials.getDeviceId()), device -> {
|
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, credentials.getDeviceId());
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
log.trace("[{}] Failed to lookup device by id", credentials.getDeviceId());
|
log.trace("[{}] Failed to lookup device by id", credentials.getDeviceId());
|
||||||
return getEmptyTransportApiResponse();
|
return getEmptyTransportApiResponse();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder();
|
||||||
|
builder.setDeviceInfo(getDeviceInfoProto(device));
|
||||||
|
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
|
||||||
|
if (deviceProfile != null) {
|
||||||
|
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
|
||||||
|
} else {
|
||||||
|
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
|
||||||
}
|
}
|
||||||
try {
|
if (!StringUtils.isEmpty(credentials.getCredentialsValue())) {
|
||||||
ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder();
|
builder.setCredentialsBody(credentials.getCredentialsValue());
|
||||||
builder.setDeviceInfo(getDeviceInfoProto(device));
|
|
||||||
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
|
|
||||||
if (deviceProfile != null) {
|
|
||||||
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
|
|
||||||
} else {
|
|
||||||
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
|
|
||||||
}
|
|
||||||
if (!StringUtils.isEmpty(credentials.getCredentialsValue())) {
|
|
||||||
builder.setCredentialsBody(credentials.getCredentialsValue());
|
|
||||||
}
|
|
||||||
return TransportApiResponseMsg.newBuilder()
|
|
||||||
.setValidateCredResponseMsg(builder.build()).build();
|
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
log.warn("[{}] Failed to lookup device by id", credentials.getDeviceId(), e);
|
|
||||||
return getEmptyTransportApiResponse();
|
|
||||||
}
|
}
|
||||||
}, MoreExecutors.directExecutor());
|
return TransportApiResponseMsg.newBuilder()
|
||||||
|
.setValidateCredResponseMsg(builder.build()).build();
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
log.warn("[{}] Failed to lookup device by id", credentials.getDeviceId(), e);
|
||||||
|
return getEmptyTransportApiResponse();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
|
private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {
|
||||||
|
|||||||
@ -1491,6 +1491,8 @@ queue:
|
|||||||
max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
|
max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
|
||||||
# Amount of threads used to invoke callbacks
|
# Amount of threads used to invoke callbacks
|
||||||
max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
|
max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
|
||||||
|
# Amount of threads used for transport API requests
|
||||||
|
max_core_handler_threads: "${TB_QUEUE_TRANSPORT_MAX_CORE_HANDLER_THREADS:16}"
|
||||||
# Interval in milliseconds to poll api requests from transport microservices
|
# Interval in milliseconds to poll api requests from transport microservices
|
||||||
request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
|
request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
|
||||||
# Interval in milliseconds to poll api response from transport microservices
|
# Interval in milliseconds to poll api response from transport microservices
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.server.service.transport;
|
package org.thingsboard.server.service.transport;
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.Futures;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -46,6 +45,7 @@ import org.thingsboard.server.dao.ota.OtaPackageService;
|
|||||||
import org.thingsboard.server.dao.queue.QueueService;
|
import org.thingsboard.server.dao.queue.QueueService;
|
||||||
import org.thingsboard.server.dao.relation.RelationService;
|
import org.thingsboard.server.dao.relation.RelationService;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
@ -62,6 +62,8 @@ import java.util.regex.Matcher;
|
|||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.BDDMockito.willReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
@ -123,11 +125,13 @@ public class DefaultTransportApiServiceTest {
|
|||||||
@Test
|
@Test
|
||||||
public void validateExistingDeviceByX509CertificateStrategy() {
|
public void validateExistingDeviceByX509CertificateStrategy() {
|
||||||
var device = createDevice();
|
var device = createDevice();
|
||||||
when(deviceService.findDeviceByIdAsync(any(), any())).thenReturn(Futures.immediateFuture(device));
|
|
||||||
|
|
||||||
var deviceCredentials = createDeviceCredentials(chain[0], device.getId());
|
var deviceCredentials = createDeviceCredentials(chain[0], device.getId());
|
||||||
when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(deviceCredentials);
|
when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(deviceCredentials);
|
||||||
|
|
||||||
|
TransportProtos.TransportApiResponseMsg response = mock(TransportProtos.TransportApiResponseMsg.class);
|
||||||
|
willReturn(response).given(service).getDeviceInfo(deviceCredentials);
|
||||||
|
|
||||||
service.validateOrCreateDeviceX509Certificate(certificateChain);
|
service.validateOrCreateDeviceX509Certificate(certificateChain);
|
||||||
verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any());
|
verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any());
|
||||||
}
|
}
|
||||||
@ -139,7 +143,6 @@ public class DefaultTransportApiServiceTest {
|
|||||||
|
|
||||||
var device = createDevice();
|
var device = createDevice();
|
||||||
when(deviceService.findDeviceByTenantIdAndName(any(), any())).thenReturn(device);
|
when(deviceService.findDeviceByTenantIdAndName(any(), any())).thenReturn(device);
|
||||||
when(deviceService.findDeviceByIdAsync(any(), any())).thenReturn(Futures.immediateFuture(device));
|
|
||||||
|
|
||||||
var deviceCredentials = createDeviceCredentials(chain[0], device.getId());
|
var deviceCredentials = createDeviceCredentials(chain[0], device.getId());
|
||||||
when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(null);
|
when(deviceCredentialsService.findDeviceCredentialsByCredentialsId(any())).thenReturn(null);
|
||||||
@ -148,9 +151,12 @@ public class DefaultTransportApiServiceTest {
|
|||||||
var provisionResponse = createProvisionResponse(deviceCredentials);
|
var provisionResponse = createProvisionResponse(deviceCredentials);
|
||||||
when(deviceProvisionService.provisionDeviceViaX509Chain(any(), any())).thenReturn(provisionResponse);
|
when(deviceProvisionService.provisionDeviceViaX509Chain(any(), any())).thenReturn(provisionResponse);
|
||||||
|
|
||||||
|
TransportProtos.TransportApiResponseMsg response = mock(TransportProtos.TransportApiResponseMsg.class);
|
||||||
|
willReturn(response).given(service).getDeviceInfo(deviceCredentials);
|
||||||
|
|
||||||
service.validateOrCreateDeviceX509Certificate(certificateChain);
|
service.validateOrCreateDeviceX509Certificate(certificateChain);
|
||||||
verify(deviceProfileService, times(1)).findDeviceProfileByProvisionDeviceKey(any());
|
verify(deviceProfileService, times(1)).findDeviceProfileByProvisionDeviceKey(any());
|
||||||
verify(deviceService, times(1)).findDeviceByIdAsync(any(), any());
|
verify(service, times(1)).getDeviceInfo(any());
|
||||||
verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any());
|
verify(deviceCredentialsService, times(1)).findDeviceCredentialsByCredentialsId(any());
|
||||||
verify(deviceProvisionService, times(1)).provisionDeviceViaX509Chain(any(), any());
|
verify(deviceProvisionService, times(1)).provisionDeviceViaX509Chain(any(), any());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -188,7 +188,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
if (deviceSessionCtx != null) {
|
if (deviceSessionCtx != null) {
|
||||||
deregisterSession(deviceName, deviceSessionCtx);
|
deregisterSession(deviceName, deviceSessionCtx);
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Device [{}] was already removed from the gateway session", sessionId, deviceName);
|
log.debug("[{}][{}][{}] Device [{}] was already removed from the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,17 +205,17 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
|
protected void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
|
||||||
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
|
log.trace("[{}][{}][{}] onDeviceConnect: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
|
||||||
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() {
|
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable T result) {
|
public void onSuccess(@Nullable T result) {
|
||||||
ack(msg, ReturnCode.SUCCESS);
|
ack(msg, ReturnCode.SUCCESS);
|
||||||
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
|
log.trace("[{}][{}][{}] onDeviceConnectOk: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, t);
|
log.warn("[{}][{}][{}] Failed to process device connect command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
|
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
@ -259,7 +259,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
||||||
T deviceSessionCtx = newDeviceSessionCtx(msg);
|
T deviceSessionCtx = newDeviceSessionCtx(msg);
|
||||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||||
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
|
log.trace("[{}][{}][{}] First got or created device [{}], type [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, deviceType);
|
||||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||||
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
|
||||||
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
|
||||||
@ -275,7 +275,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable e) {
|
public void onError(Throwable e) {
|
||||||
log.warn("[{}] Failed to process device connect command: {}", sessionId, deviceName, e);
|
log.warn("[{}][{}][{}] Failed to process device connect command at getDeviceCreationFuture: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, e);
|
||||||
futureToSet.setException(e);
|
futureToSet.setException(e);
|
||||||
deviceFutures.remove(deviceName);
|
deviceFutures.remove(deviceName);
|
||||||
}
|
}
|
||||||
@ -348,14 +348,14 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray());
|
TransportProtos.PostTelemetryMsg postTelemetryMsg = JsonConverter.convertToTelemetryProto(deviceEntry.getValue().getAsJsonArray());
|
||||||
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
|
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
|
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
|
||||||
channel.close();
|
channel.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
}
|
}
|
||||||
@ -380,19 +380,19 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
|
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
|
||||||
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
|
processPostTelemetryMsg(deviceCtx, postTelemetryMsg, deviceName, msgId);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("[{}][{}] Failed to convert telemetry: {}", gateway.getDeviceId(), deviceName, msg, e);
|
log.warn("[{}][{}][{}] Failed to convert telemetry: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, msg, e);
|
||||||
channel.close();
|
channel.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device telemetry command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device telemetry command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Devices telemetry messages is empty for: [{}]", sessionId, gateway.getDeviceId());
|
log.debug("[{}][{}][{}] Devices telemetry messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||||
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
|
throw new IllegalArgumentException("[" + sessionId + "] Devices telemetry messages is empty for [" + gateway.getDeviceId() + "]");
|
||||||
}
|
}
|
||||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||||
@ -433,13 +433,13 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
|
TransportProtos.ClaimDeviceMsg claimDeviceMsg = JsonConverter.convertToClaimDeviceProto(deviceId, deviceEntry.getValue());
|
||||||
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
|
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("[{}][{}] Failed to convert claim message: {}", gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
|
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, deviceEntry.getValue(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
}
|
}
|
||||||
@ -468,18 +468,18 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
|
TransportProtos.ClaimDeviceMsg claimDeviceMsg = ProtoConverter.convertToClaimDeviceProto(deviceId, claimRequest.toByteArray());
|
||||||
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
|
processClaimDeviceMsg(deviceCtx, claimDeviceMsg, deviceName, msgId);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("[{}][{}] Failed to convert claim message: {}", gateway.getDeviceId(), deviceName, claimRequest, e);
|
log.warn("[{}][{}][{}] Failed to convert claim message: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, claimRequest, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device claiming command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device claiming command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Devices claim messages is empty for: [{}]", sessionId, gateway.getDeviceId());
|
log.debug("[{}][{}][{}] Devices claim messages is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||||
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
|
throw new IllegalArgumentException("[" + sessionId + "] Devices claim messages is empty for [" + gateway.getDeviceId() + "]");
|
||||||
}
|
}
|
||||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||||
@ -510,7 +510,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
}
|
}
|
||||||
@ -538,18 +538,18 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
|
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto);
|
||||||
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
|
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e);
|
log.warn("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), deviceName, kvListProto, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device attributes command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device attributes command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Devices attributes keys list is empty for: [{}]", sessionId, gateway.getDeviceId());
|
log.debug("[{}][{}][{}] Devices attributes keys list is empty", gateway.getTenantId(), gateway.getDeviceId(), sessionId);
|
||||||
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
|
throw new IllegalArgumentException("[" + sessionId + "] Devices attributes keys list is empty for [" + gateway.getDeviceId() + "]");
|
||||||
}
|
}
|
||||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||||
@ -618,7 +618,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device Rpc response command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
} else {
|
} else {
|
||||||
@ -643,7 +643,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
log.debug("[{}] Failed to process device Rpc response command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device Rpc response command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
} catch (RuntimeException | InvalidProtocolBufferException e) {
|
||||||
@ -667,7 +667,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC);
|
ack(mqttMsg, ReturnCode.IMPLEMENTATION_SPECIFIC);
|
||||||
log.debug("[{}] Failed to process device attributes request command: {}", sessionId, deviceName, t);
|
log.debug("[{}][{}][{}] Failed to process device attributes request command: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, t);
|
||||||
}
|
}
|
||||||
}, context.getExecutor());
|
}, context.getExecutor());
|
||||||
}
|
}
|
||||||
@ -687,7 +687,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
protected ListenableFuture<T> checkDeviceConnected(String deviceName) {
|
protected ListenableFuture<T> checkDeviceConnected(String deviceName) {
|
||||||
T ctx = devices.get(deviceName);
|
T ctx = devices.get(deviceName);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName);
|
log.debug("[{}][{}][{}] Missing device [{}] for the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
|
||||||
return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
|
return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
|
||||||
} else {
|
} else {
|
||||||
return Futures.immediateFuture(ctx);
|
return Futures.immediateFuture(ctx);
|
||||||
@ -733,7 +733,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
}
|
}
|
||||||
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
|
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
|
||||||
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
|
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
|
||||||
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
|
log.debug("[{}][{}][{}] Removed device [{}] from the gateway session", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState connectionState, long ts) {
|
public void sendSparkplugStateOnTelemetry(TransportProtos.SessionInfoProto sessionInfo, String deviceName, SparkplugConnectionState connectionState, long ts) {
|
||||||
@ -749,7 +749,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
return new TransportServiceCallback<Void>() {
|
return new TransportServiceCallback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(Void dummy) {
|
public void onSuccess(Void dummy) {
|
||||||
log.trace("[{}][{}] Published msg: {}", sessionId, deviceName, msg);
|
log.trace("[{}][{}][{}][{}] Published msg: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, deviceName, msg);
|
||||||
if (msgId > 0) {
|
if (msgId > 0) {
|
||||||
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS));
|
ctx.writeAndFlush(MqttTransportHandler.createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.SUCCESS));
|
||||||
}
|
}
|
||||||
@ -757,7 +757,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable e) {
|
public void onError(Throwable e) {
|
||||||
log.trace("[{}] Failed to publish msg: {} for device: {}", sessionId, msg, deviceName, e);
|
log.trace("[{}][{}][{}] Failed to publish msg: [{}] for device: [{}]", gateway.getTenantId(), gateway.getDeviceId(), sessionId, msg, deviceName, e);
|
||||||
ctx.close();
|
ctx.close();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@ -82,7 +82,7 @@ public class DeviceCredentialsServiceImpl extends AbstractCachedEntityService<St
|
|||||||
validateString(credentialsId, "Incorrect credentialsId " + credentialsId);
|
validateString(credentialsId, "Incorrect credentialsId " + credentialsId);
|
||||||
return cache.getAndPutInTransaction(credentialsId,
|
return cache.getAndPutInTransaction(credentialsId,
|
||||||
() -> deviceCredentialsDao.findByCredentialsId(TenantId.SYS_TENANT_ID, credentialsId),
|
() -> deviceCredentialsDao.findByCredentialsId(TenantId.SYS_TENANT_ID, credentialsId),
|
||||||
false);
|
true); // caching null values is essential for permanently invalid requests
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.sql.device;
|
package org.thingsboard.server.dao.sql.device;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.data.jpa.repository.JpaRepository;
|
import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -33,6 +34,7 @@ import java.util.UUID;
|
|||||||
/**
|
/**
|
||||||
* Created by Valerii Sosliuk on 5/6/2017.
|
* Created by Valerii Sosliuk on 5/6/2017.
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@SqlDao
|
@SqlDao
|
||||||
public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEntity, DeviceCredentials> implements DeviceCredentialsDao {
|
public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEntity, DeviceCredentials> implements DeviceCredentialsDao {
|
||||||
@ -65,6 +67,7 @@ public class JpaDeviceCredentialsDao extends JpaAbstractDao<DeviceCredentialsEnt
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DeviceCredentials findByCredentialsId(TenantId tenantId, String credentialsId) {
|
public DeviceCredentials findByCredentialsId(TenantId tenantId, String credentialsId) {
|
||||||
|
log.trace("[{}] findByCredentialsId [{}]", tenantId, credentialsId);
|
||||||
return DaoUtil.getData(deviceCredentialsRepository.findByCredentialsId(credentialsId));
|
return DaoUtil.getData(deviceCredentialsRepository.findByCredentialsId(credentialsId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -127,10 +127,10 @@ public class DeviceCredentialsCacheTest extends AbstractServiceTest {
|
|||||||
|
|
||||||
when(deviceCredentialsDao.findByCredentialsId(SYSTEM_TENANT_ID, CREDENTIALS_ID_1)).thenReturn(null);
|
when(deviceCredentialsDao.findByCredentialsId(SYSTEM_TENANT_ID, CREDENTIALS_ID_1)).thenReturn(null);
|
||||||
|
|
||||||
deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
|
deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1); // cache miss, DB read (not found), Cache put null value
|
||||||
deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1);
|
deviceCredentialsService.findDeviceCredentialsByCredentialsId(CREDENTIALS_ID_1); // cache hit (null value, credentials not found)
|
||||||
|
|
||||||
verify(deviceCredentialsDao, times(3)).findByCredentialsId(SYSTEM_TENANT_ID, CREDENTIALS_ID_1);
|
verify(deviceCredentialsDao, times(2)).findByCredentialsId(SYSTEM_TENANT_ID, CREDENTIALS_ID_1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DeviceCredentialsService unwrapDeviceCredentialsService() throws Exception {
|
private DeviceCredentialsService unwrapDeviceCredentialsService() throws Exception {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user