Merge branch 'master' of github.com:thingsboard/thingsboard
This commit is contained in:
commit
3bc3160b21
@ -41,7 +41,6 @@ import java.util.Map;
|
||||
public class WebSocketConfiguration implements WebSocketConfigurer {
|
||||
|
||||
public static final String WS_PLUGIN_PREFIX = "/api/ws/plugins/";
|
||||
public static final String WS_SECURITY_USER_ATTRIBUTE = "SECURITY_USER";
|
||||
private static final String WS_PLUGIN_MAPPING = WS_PLUGIN_PREFIX + "**";
|
||||
|
||||
@Bean
|
||||
@ -68,7 +67,6 @@ public class WebSocketConfiguration implements WebSocketConfigurer {
|
||||
response.setStatusCode(HttpStatus.UNAUTHORIZED);
|
||||
return false;
|
||||
} else {
|
||||
attributes.put(WS_SECURITY_USER_ATTRIBUTE, user);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,14 +16,17 @@
|
||||
package org.thingsboard.server.controller.plugin;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.tomcat.websocket.Constants;
|
||||
import org.springframework.beans.factory.BeanCreationNotAllowedException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.springframework.web.socket.TextMessage;
|
||||
import org.springframework.web.socket.WebSocketSession;
|
||||
import org.springframework.web.socket.adapter.NativeWebSocketSession;
|
||||
import org.springframework.web.socket.handler.TextWebSocketHandler;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
@ -38,6 +41,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketMsgEndpoint;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketService;
|
||||
import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef;
|
||||
|
||||
import javax.websocket.Session;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.InvalidParameterException;
|
||||
@ -56,6 +60,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
@Autowired
|
||||
private TelemetryWebSocketService webSocketService;
|
||||
|
||||
@Value("${server.ws.blocking_send_timeout:5000}")
|
||||
private long blockingSendTimeout;
|
||||
|
||||
@Value("${server.ws.limits.max_sessions_per_tenant:0}")
|
||||
private int maxSessionsPerTenant;
|
||||
@Value("${server.ws.limits.max_sessions_per_customer:0}")
|
||||
@ -96,6 +103,12 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
|
||||
super.afterConnectionEstablished(session);
|
||||
try {
|
||||
if (session instanceof NativeWebSocketSession) {
|
||||
Session nativeSession = ((NativeWebSocketSession)session).getNativeSession(Session.class);
|
||||
if (nativeSession != null) {
|
||||
nativeSession.getUserProperties().put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, new Long(blockingSendTimeout));
|
||||
}
|
||||
}
|
||||
String internalSessionId = session.getId();
|
||||
TelemetryWebSocketSessionRef sessionRef = toRef(session);
|
||||
String externalSessionId = sessionRef.getSessionId();
|
||||
@ -159,7 +172,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements Telemetr
|
||||
if (!"telemetry".equalsIgnoreCase(serviceToken)) {
|
||||
throw new InvalidParameterException("Can't find plugin with specified token!");
|
||||
} else {
|
||||
SecurityUser currentUser = (SecurityUser) session.getAttributes().get(WebSocketConfiguration.WS_SECURITY_USER_ATTRIBUTE);
|
||||
SecurityUser currentUser = (SecurityUser) ((Authentication)session.getPrincipal()).getPrincipal();
|
||||
return new TelemetryWebSocketSessionRef(UUID.randomUUID().toString(), currentUser, session.getLocalAddress(), session.getRemoteAddress());
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,14 +75,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -137,7 +130,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
executor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
||||
executor = Executors.newWorkStealingPool(50);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -23,28 +23,10 @@ import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.rule.engine.api.util.DonAsynchron;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.common.transport.service.AbstractTransportService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.*;
|
||||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
|
||||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
|
||||
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
|
||||
@ -53,15 +35,6 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
|
||||
@ -28,11 +28,7 @@ import org.thingsboard.server.kafka.*;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 05.10.18.
|
||||
@ -68,7 +64,7 @@ public class RemoteTransportApiService {
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
||||
this.transportCallbackExecutor = Executors.newWorkStealingPool(100);
|
||||
|
||||
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
|
||||
responseBuilder.settings(kafkaSettings);
|
||||
|
||||
@ -33,6 +33,7 @@ server:
|
||||
key-alias: "${SSL_KEY_ALIAS:tomcat}"
|
||||
log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}"
|
||||
ws:
|
||||
blocking_send_timeout: "${TB_SERVER_WS_BLOCKING_SEND_TIMEOUT:5000}"
|
||||
limits:
|
||||
# Limit the amount of sessions and subscriptions available on each server. Put values to zero to disable particular limitation
|
||||
max_sessions_per_tenant: "${TB_SERVER_WS_TENANT_RATE_LIMITS_MAX_SESSIONS_PER_TENANT:0}"
|
||||
@ -167,6 +168,9 @@ cassandra:
|
||||
buffer_size: "${CASSANDRA_QUERY_BUFFER_SIZE:200000}"
|
||||
concurrent_limit: "${CASSANDRA_QUERY_CONCURRENT_LIMIT:1000}"
|
||||
permit_max_wait_time: "${PERMIT_MAX_WAIT_TIME:120000}"
|
||||
dispatcher_threads: "${CASSANDRA_QUERY_DISPATCHER_THREADS:2}"
|
||||
callback_threads: "${CASSANDRA_QUERY_CALLBACK_THREADS:4}"
|
||||
poll_ms: "${CASSANDRA_QUERY_POLL_MS:50}"
|
||||
rate_limit_print_interval_ms: "${CASSANDRA_QUERY_RATE_LIMIT_PRINT_MS:10000}"
|
||||
tenant_rate_limits:
|
||||
enabled: "${CASSANDRA_QUERY_TENANT_RATE_LIMITS_ENABLED:false}"
|
||||
|
||||
@ -28,15 +28,7 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 17.10.18.
|
||||
@ -278,7 +270,7 @@ public abstract class AbstractTransportService implements TransportService {
|
||||
new TbRateLimits(perDevicesLimitsConf);
|
||||
}
|
||||
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.transportCallbackExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
||||
this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
|
||||
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, sessionReportTimeout, sessionReportTimeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
|
||||
@ -52,7 +52,7 @@ public class CassandraBufferedRateExecutor extends AbstractBufferedRateExecutor<
|
||||
@Value("${cassandra.query.concurrent_limit}") int concurrencyLimit,
|
||||
@Value("${cassandra.query.permit_max_wait_time}") long maxWaitTime,
|
||||
@Value("${cassandra.query.dispatcher_threads:2}") int dispatcherThreads,
|
||||
@Value("${cassandra.query.callback_threads:2}") int callbackThreads,
|
||||
@Value("${cassandra.query.callback_threads:4}") int callbackThreads,
|
||||
@Value("${cassandra.query.poll_ms:50}") long pollMs,
|
||||
@Value("${cassandra.query.tenant_rate_limits.enabled}") boolean tenantRateLimitsEnabled,
|
||||
@Value("${cassandra.query.tenant_rate_limits.configuration}") String tenantRateLimitsConfiguration,
|
||||
|
||||
@ -25,17 +25,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimits;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
@ -72,7 +62,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
|
||||
this.concurrencyLimit = concurrencyLimit;
|
||||
this.queue = new LinkedBlockingDeque<>(queueLimit);
|
||||
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads);
|
||||
this.callbackExecutor = new ThreadPoolExecutor(callbackThreads, 50, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
||||
this.callbackExecutor = Executors.newWorkStealingPool(callbackThreads);
|
||||
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.perTenantLimitsEnabled = perTenantLimitsEnabled;
|
||||
this.perTenantLimitsConfiguration = perTenantLimitsConfiguration;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user