diff --git a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java index 59b7da2e76..a0d3727ce0 100644 --- a/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java +++ b/application/src/main/java/org/thingsboard/server/config/WebSocketConfiguration.java @@ -41,7 +41,6 @@ import java.util.Map; public class WebSocketConfiguration implements WebSocketConfigurer { public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/"; - public static final String WS_SECURITY_USER_ATTRIBUTE = "SECURITY_USER"; private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**"; @Bean @@ -68,7 +67,6 @@ public class WebSocketConfiguration implements WebSocketConfigurer { response.setStatusCode(HttpStatus.UNAUTHORIZED); return false; } else { - attributes.put(WS_SECURITY_USER_ATTRIBUTE, user); return true; } } diff --git a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java index f579db8588..c8b4553415 100644 --- a/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java +++ b/application/src/main/java/org/thingsboard/server/controller/plugin/TbWebSocketHandler.java @@ -16,14 +16,17 @@ package org.thingsboard.server.controller.plugin; import lombok.extern.slf4j.Slf4j; +import org.apache.tomcat.websocket.Constants; import org.springframework.beans.factory.BeanCreationNotAllowedException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.security.core.Authentication; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.adapter.NativeWebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.id.CustomerId; @@ -38,6 +41,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import javax.websocket.Session; import java.io.IOException; import java.net.URI; import java.security.InvalidParameterException; @@ -56,6 +60,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr @Autowired private TelemetryWebSocketService webSocketService; + @Value("${server.ws.blocking_send_timeout:5000}") + private long blockingSendTimeout; + @Value("${server.ws.limits.max_sessions_per_tenant:0}") private int maxSessionsPerTenant; @Value("${server.ws.limits.max_sessions_per_customer:0}") @@ -96,6 +103,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); try { + if (session instanceof NativeWebSocketSession) { + Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class); + if (nativeSession != null) { + nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout)); + } + } String internalSessionId = session.getId(); TelemetryWebSocketSessionRef sessionRef = toRef(session); String externalSessionId = sessionRef.getSessionId(); @@ -159,7 +172,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr if (!"telemetry".equalsIgnoreCase(serviceToken)) { throw new InvalidParameterException("Can't find plugin with specified token!"); } else { - SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE); + SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal(); return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 6b87e033fc..6bda32dbff 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -75,14 +75,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -137,7 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi @PostConstruct public void initExecutor() { - executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + executor = Executors.newWorkStealingPool(50); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java index 3d12938903..7d4d93f4ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportService.java @@ -23,28 +23,10 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.util.DonAsynchron; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.transport.SessionMsgListener; -import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.service.AbstractTransportService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; import org.thingsboard.server.service.encoding.DataDecodingEncodingService; @@ -53,15 +35,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 2ced33a90b..5afd09ea6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -28,11 +28,7 @@ import org.thingsboard.server.kafka.*; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Created by ashvayka on 05.10.18. @@ -68,7 +64,7 @@ public class RemoteTransportApiService { @PostConstruct public void init() { - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + this.transportCallbackExecutor = Executors.newWorkStealingPool(100); TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); responseBuilder.settings(kafkaSettings); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c19f6940a4..e98ea17b05 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -33,6 +33,7 @@ server: key-alias: "${SSL_KEY_ALIAS:tomcat}" log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}" ws: + blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}" limits: # Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}" @@ -167,6 +168,9 @@ cassandra: buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}" concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}" permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}" + dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}" + callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}" + poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}" rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}" tenant_rate_limits: enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}" diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 938584460f..5a81b97aed 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -28,15 +28,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * Created by ashvayka on 17.10.18. @@ -278,7 +270,7 @@ public abstract class AbstractTransportService implements TransportService { new TbRateLimits(perDevicesLimitsConf); } this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(); - this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + this.transportCallbackExecutor = Executors.newWorkStealingPool(20); this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java index 7de20d99b2..f58f035013 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/CassandraBufferedRateExecutor.java @@ -52,7 +52,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor< @Value("${cassandra.query.concurrent_limit}") int concurrencyLimit, @Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime, @Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads, - @Value("${cassandra.query.callback_threads:2}") int callbackThreads, + @Value("${cassandra.query.callback_threads:4}") int callbackThreads, @Value("${cassandra.query.poll_ms:50}") long pollMs, @Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled, @Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration, diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index b204a44302..a5415f0a1c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -25,17 +25,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimits; import javax.annotation.Nullable; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** @@ -72,7 +62,7 @@ public abstract class AbstractBufferedRateExecutor(queueLimit); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads); - this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads); this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); this.perTenantLimitsEnabled = perTenantLimitsEnabled; this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;