Merge remote-tracking branch 'origin/hotfix/3.7' into master-hotfix
This commit is contained in:
commit
7007563627
@ -0,0 +1,68 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.springframework.http.converter.xml;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter;
|
||||||
|
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
import java.lang.reflect.Type;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RestTemplate firstly uses MappingJackson2XmlHttpMessageConverter converter instead of MappingJackson2HttpMessageConverter.
|
||||||
|
* It produces error UnsupportedMediaType, so this converter had to be shadowed for read and write operations to use the correct converter
|
||||||
|
*/
|
||||||
|
public class MappingJackson2XmlHttpMessageConverter extends AbstractJackson2HttpMessageConverter {
|
||||||
|
private static final List<MediaType> problemDetailMediaTypes;
|
||||||
|
|
||||||
|
public MappingJackson2XmlHttpMessageConverter() {
|
||||||
|
this(Jackson2ObjectMapperBuilder.xml().build());
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappingJackson2XmlHttpMessageConverter(ObjectMapper objectMapper) {
|
||||||
|
super(objectMapper, new MediaType[]{new MediaType("application", "xml", StandardCharsets.UTF_8), new MediaType("text", "xml", StandardCharsets.UTF_8), new MediaType("application", "*+xml", StandardCharsets.UTF_8)});
|
||||||
|
Assert.isInstanceOf(XmlMapper.class, objectMapper, "XmlMapper required");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setObjectMapper(ObjectMapper objectMapper) {
|
||||||
|
Assert.isInstanceOf(XmlMapper.class, objectMapper, "XmlMapper required");
|
||||||
|
super.setObjectMapper(objectMapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected List<MediaType> getMediaTypesForProblemDetail() {
|
||||||
|
return problemDetailMediaTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
problemDetailMediaTypes = Collections.singletonList(MediaType.APPLICATION_PROBLEM_XML);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canRead(Type type, Class<?> contextClass, MediaType mediaType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean canWrite(Class<?> clazz, MediaType mediaType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -44,6 +44,7 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
|
|||||||
import org.thingsboard.server.common.data.query.EntityKey;
|
import org.thingsboard.server.common.data.query.EntityKey;
|
||||||
import org.thingsboard.server.common.data.query.EntityKeyType;
|
import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||||
import org.thingsboard.server.common.data.query.TsValue;
|
import org.thingsboard.server.common.data.query.TsValue;
|
||||||
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.entity.EntityService;
|
import org.thingsboard.server.dao.entity.EntityService;
|
||||||
@ -355,6 +356,9 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
|
|||||||
|
|
||||||
private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) {
|
private void handleWsCmdRuntimeException(String sessionId, RuntimeException e, EntityDataCmd cmd) {
|
||||||
log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e);
|
log.debug("[{}] Failed to process ws cmd: {}", sessionId, cmd, e);
|
||||||
|
if (e instanceof TbRateLimitsException) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED);
|
wsService.close(sessionId, CloseStatus.SERVICE_RESTARTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -18,12 +18,15 @@ package org.thingsboard.server.service.subscription;
|
|||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.context.event.EventListener;
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.common.util.DeduplicationUtil;
|
||||||
import org.thingsboard.common.util.DonAsynchron;
|
import org.thingsboard.common.util.DonAsynchron;
|
||||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.server.cache.limits.RateLimitService;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
@ -36,10 +39,12 @@ import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
|
|||||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.limit.LimitedApi;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
@ -48,6 +53,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
|||||||
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
|
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.ws.WebSocketService;
|
import org.thingsboard.server.service.ws.WebSocketService;
|
||||||
|
import org.thingsboard.server.service.ws.WebSocketSessionRef;
|
||||||
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
|
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
|
||||||
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
|
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
|
||||||
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
|
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
|
||||||
@ -88,13 +94,20 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
private final TbClusterService clusterService;
|
private final TbClusterService clusterService;
|
||||||
private final SubscriptionManagerService subscriptionManagerService;
|
private final SubscriptionManagerService subscriptionManagerService;
|
||||||
private final WebSocketService webSocketService;
|
private final WebSocketService webSocketService;
|
||||||
|
private final RateLimitService rateLimitService;
|
||||||
|
|
||||||
private ExecutorService tsCallBackExecutor;
|
private ExecutorService tsCallBackExecutor;
|
||||||
private ScheduledExecutorService staleSessionCleanupExecutor;
|
private ScheduledExecutorService staleSessionCleanupExecutor;
|
||||||
|
|
||||||
|
@Value("${server.ws.rate_limits.subscriptions_per_tenant:2000:60}")
|
||||||
|
private String subscriptionsPerTenantRateLimit;
|
||||||
|
@Value("${server.ws.rate_limits.subscriptions_per_user:500:60}")
|
||||||
|
private String subscriptionsPerUserRateLimit;
|
||||||
|
|
||||||
public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider,
|
public DefaultTbLocalSubscriptionService(AttributesService attrService, TimeseriesService tsService, TbServiceInfoProvider serviceInfoProvider,
|
||||||
PartitionService partitionService, TbClusterService clusterService,
|
PartitionService partitionService, TbClusterService clusterService,
|
||||||
@Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService) {
|
@Lazy SubscriptionManagerService subscriptionManagerService, @Lazy WebSocketService webSocketService,
|
||||||
|
RateLimitService rateLimitService) {
|
||||||
this.attrService = attrService;
|
this.attrService = attrService;
|
||||||
this.tsService = tsService;
|
this.tsService = tsService;
|
||||||
this.serviceInfoProvider = serviceInfoProvider;
|
this.serviceInfoProvider = serviceInfoProvider;
|
||||||
@ -102,6 +115,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
this.subscriptionManagerService = subscriptionManagerService;
|
this.subscriptionManagerService = subscriptionManagerService;
|
||||||
this.webSocketService = webSocketService;
|
this.webSocketService = webSocketService;
|
||||||
|
this.rateLimitService = rateLimitService;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String serviceId;
|
private String serviceId;
|
||||||
@ -185,9 +199,18 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addSubscription(TbSubscription<?> subscription) {
|
public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) {
|
||||||
TenantId tenantId = subscription.getTenantId();
|
TenantId tenantId = subscription.getTenantId();
|
||||||
EntityId entityId = subscription.getEntityId();
|
EntityId entityId = subscription.getEntityId();
|
||||||
|
if (!rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, (Object) tenantId, subscriptionsPerTenantRateLimit)) {
|
||||||
|
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per tenant");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (sessionRef.getSecurityCtx() != null && !rateLimitService.checkRateLimit(LimitedApi.WS_SUBSCRIPTIONS, sessionRef.getSecurityCtx().getId(), subscriptionsPerUserRateLimit)) {
|
||||||
|
handleRateLimitError(subscription, sessionRef, "Exceeded rate limit for WS subscriptions per user");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
|
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
|
||||||
SubscriptionModificationResult result;
|
SubscriptionModificationResult result;
|
||||||
subsLock.lock();
|
subsLock.lock();
|
||||||
@ -584,4 +607,13 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
|||||||
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
|
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {
|
||||||
|
String deduplicationKey = sessionRef.getSessionId() + message;
|
||||||
|
if (!DeduplicationUtil.alreadyProcessed(deduplicationKey, TimeUnit.SECONDS.toMillis(15))) {
|
||||||
|
log.info("{} {}", sessionRef, message);
|
||||||
|
webSocketService.sendError(sessionRef, subscription.getSubscriptionId(), SubscriptionErrorCode.BAD_REQUEST, message);
|
||||||
|
}
|
||||||
|
throw new TbRateLimitsException(message);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -132,7 +132,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
|
|||||||
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
|
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
|
||||||
subToEntityIdMap.put(subIdx, entityData.getEntityId());
|
subToEntityIdMap.put(subIdx, entityData.getEntityId());
|
||||||
localSubscriptionService.addSubscription(
|
localSubscriptionService.addSubscription(
|
||||||
createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues));
|
createTsSub(entityData, subIdx, false, startTs, endTs, keyStates, resultToLatestValues), sessionRef);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,7 +140,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
|
|||||||
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
|
Map<EntityKeyType, List<EntityKey>> keysByType = getEntityKeyByTypeMap(keys);
|
||||||
for (EntityData entityData : data.getData()) {
|
for (EntityData entityData : data.getData()) {
|
||||||
List<TbSubscription> entitySubscriptions = addSubscriptions(entityData, keysByType, latestValues, startTs, endTs);
|
List<TbSubscription> entitySubscriptions = addSubscriptions(entityData, keysByType, latestValues, startTs, endTs);
|
||||||
entitySubscriptions.forEach(localSubscriptionService::addSubscription);
|
entitySubscriptions.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,4 +254,5 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
|
|||||||
abstract void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues);
|
abstract void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, EntityKeyType keyType, boolean resultToLatestValues);
|
||||||
|
|
||||||
protected abstract Aggregation getCurrentAggregation();
|
protected abstract Aggregation getCurrentAggregation();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -152,7 +152,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
|
|||||||
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
|
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
|
||||||
.build();
|
.build();
|
||||||
subToDynamicValueKeySet.add(subIdx);
|
subToDynamicValueKeySet.add(subIdx);
|
||||||
localSubscriptionService.addSubscription(sub);
|
localSubscriptionService.addSubscription(sub, sessionRef);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException | ExecutionException e) {
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
|
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
|
||||||
|
|||||||
@ -177,7 +177,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
|||||||
.updateProcessor((sub, update) -> sendWsMsg(sub.getSessionId(), update))
|
.updateProcessor((sub, update) -> sendWsMsg(sub.getSessionId(), update))
|
||||||
.ts(startTs)
|
.ts(startTs)
|
||||||
.build();
|
.build();
|
||||||
localSubscriptionService.addSubscription(subscription);
|
localSubscriptionService.addSubscription(subscription, sessionRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -342,7 +342,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
|||||||
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
|
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
|
||||||
}
|
}
|
||||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
||||||
subsToAdd.forEach(localSubscriptionService::addSubscription);
|
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resetInvocationCounter() {
|
private void resetInvocationCounter() {
|
||||||
@ -361,4 +361,5 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
|||||||
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder);
|
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, entitiesSortOrder);
|
||||||
return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters());
|
return new EntityDataQuery(query.getEntityFilter(), edpl, query.getEntityFields(), query.getLatestValues(), query.getKeyFilters());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -226,7 +226,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
||||||
subsToAdd.forEach(localSubscriptionService::addSubscription);
|
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
|
||||||
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
|
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,4 +239,5 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
|
|||||||
protected EntityDataQuery buildEntityDataQuery() {
|
protected EntityDataQuery buildEntityDataQuery() {
|
||||||
return query;
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
|||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
|
import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent;
|
||||||
|
import org.thingsboard.server.service.ws.WebSocketSessionRef;
|
||||||
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
|
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
|
||||||
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
|
import org.thingsboard.server.service.ws.notification.sub.NotificationsSubscriptionUpdate;
|
||||||
|
|
||||||
@ -29,7 +30,7 @@ import java.util.List;
|
|||||||
|
|
||||||
public interface TbLocalSubscriptionService {
|
public interface TbLocalSubscriptionService {
|
||||||
|
|
||||||
void addSubscription(TbSubscription<?> subscription);
|
void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef);
|
||||||
|
|
||||||
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);
|
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);
|
||||||
|
|
||||||
|
|||||||
@ -49,6 +49,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
|||||||
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
|
||||||
|
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
@ -223,9 +224,10 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
try {
|
try {
|
||||||
Optional.ofNullable(cmdsHandlers.get(cmd.getType()))
|
Optional.ofNullable(cmdsHandlers.get(cmd.getType()))
|
||||||
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
|
.ifPresent(cmdHandler -> cmdHandler.handle(sessionRef, cmd));
|
||||||
|
} catch (TbRateLimitsException e) {
|
||||||
|
log.debug("{} Failed to handle WS cmd: {}", sessionRef, cmd, e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[sessionId: {}, tenantId: {}, userId: {}] Failed to handle WS cmd: {}", sessionId,
|
log.error("{} Failed to handle WS cmd: {}", sessionRef, cmd, e);
|
||||||
sessionRef.getSecurityCtx().getTenantId(), sessionRef.getSecurityCtx().getId(), cmd, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -467,7 +469,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
subLock.lock();
|
subLock.lock();
|
||||||
try {
|
try {
|
||||||
oldSubService.addSubscription(sub);
|
oldSubService.addSubscription(sub, sessionRef);
|
||||||
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
|
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
|
||||||
} finally {
|
} finally {
|
||||||
subLock.unlock();
|
subLock.unlock();
|
||||||
@ -580,7 +582,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
subLock.lock();
|
subLock.lock();
|
||||||
try {
|
try {
|
||||||
oldSubService.addSubscription(sub);
|
oldSubService.addSubscription(sub, sessionRef);
|
||||||
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
|
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), attributesData));
|
||||||
} finally {
|
} finally {
|
||||||
subLock.unlock();
|
subLock.unlock();
|
||||||
@ -677,7 +679,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
subLock.lock();
|
subLock.lock();
|
||||||
try {
|
try {
|
||||||
oldSubService.addSubscription(sub);
|
oldSubService.addSubscription(sub, sessionRef);
|
||||||
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
|
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
|
||||||
} finally {
|
} finally {
|
||||||
subLock.unlock();
|
subLock.unlock();
|
||||||
@ -732,7 +734,7 @@ public class DefaultWebSocketService implements WebSocketService {
|
|||||||
|
|
||||||
subLock.lock();
|
subLock.lock();
|
||||||
try {
|
try {
|
||||||
oldSubService.addSubscription(sub);
|
oldSubService.addSubscription(sub, sessionRef);
|
||||||
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
|
sendUpdate(sessionRef, new TelemetrySubscriptionUpdate(cmd.getCmdId(), data));
|
||||||
} finally {
|
} finally {
|
||||||
subLock.unlock();
|
subLock.unlock();
|
||||||
|
|||||||
@ -81,7 +81,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
|
|||||||
.limit(cmd.getLimit())
|
.limit(cmd.getLimit())
|
||||||
.notificationTypes(cmd.getTypes())
|
.notificationTypes(cmd.getTypes())
|
||||||
.build();
|
.build();
|
||||||
localSubscriptionService.addSubscription(subscription);
|
localSubscriptionService.addSubscription(subscription, sessionRef);
|
||||||
|
|
||||||
fetchUnreadNotifications(subscription);
|
fetchUnreadNotifications(subscription);
|
||||||
sendUpdate(sessionRef.getSessionId(), subscription.createFullUpdate());
|
sendUpdate(sessionRef.getSessionId(), subscription.createFullUpdate());
|
||||||
@ -99,7 +99,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
|
|||||||
.entityId(securityCtx.getId())
|
.entityId(securityCtx.getId())
|
||||||
.updateProcessor(this::handleNotificationsCountSubscriptionUpdate)
|
.updateProcessor(this::handleNotificationsCountSubscriptionUpdate)
|
||||||
.build();
|
.build();
|
||||||
localSubscriptionService.addSubscription(subscription);
|
localSubscriptionService.addSubscription(subscription, sessionRef);
|
||||||
|
|
||||||
fetchUnreadNotificationsCount(subscription);
|
fetchUnreadNotificationsCount(subscription);
|
||||||
sendUpdate(sessionRef.getSessionId(), subscription.createUpdate());
|
sendUpdate(sessionRef.getSessionId(), subscription.createUpdate());
|
||||||
|
|||||||
@ -78,6 +78,11 @@ server:
|
|||||||
max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}"
|
max_queue_messages_per_session: "${TB_SERVER_WS_DEFAULT_QUEUE_MESSAGES_PER_SESSION:1000}"
|
||||||
# Maximum time between WS session opening and sending auth command
|
# Maximum time between WS session opening and sending auth command
|
||||||
auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}"
|
auth_timeout_ms: "${TB_SERVER_WS_AUTH_TIMEOUT_MS:10000}"
|
||||||
|
rate_limits:
|
||||||
|
# Per-tenant rate limit for WS subscriptions
|
||||||
|
subscriptions_per_tenant: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_TENANT_RATE_LIMIT:2000:60}"
|
||||||
|
# Per-user rate limit for WS subscriptions
|
||||||
|
subscriptions_per_user: "${TB_SERVER_WS_SUBSCRIPTIONS_PER_USER_RATE_LIMIT:500:60}"
|
||||||
rest:
|
rest:
|
||||||
server_side_rpc:
|
server_side_rpc:
|
||||||
# Minimum value of the server-side RPC timeout. May override value provided in the REST API call.
|
# Minimum value of the server-side RPC timeout. May override value provided in the REST API call.
|
||||||
|
|||||||
@ -17,20 +17,45 @@ package org.thingsboard.server.system;
|
|||||||
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.mock.http.client.MockClientHttpRequest;
|
||||||
|
import org.springframework.test.web.client.MockRestServiceServer;
|
||||||
import org.springframework.util.ClassUtils;
|
import org.springframework.util.ClassUtils;
|
||||||
import org.springframework.web.client.RestTemplate;
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
|
||||||
|
import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess;
|
||||||
|
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class RestTemplateConvertersTest {
|
public class RestTemplateConvertersTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testJacksonXmlConverter() {
|
public void testMappingJackson2HttpMessageConverterIsUsedInsteadOfMappingJackson2XmlHttpMessageConverter() {
|
||||||
ClassLoader classLoader = RestTemplate.class.getClassLoader();
|
ClassLoader classLoader = RestTemplate.class.getClassLoader();
|
||||||
boolean jackson2XmlPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.xml.XmlMapper", classLoader);
|
boolean jackson2XmlPresent = ClassUtils.isPresent("com.fasterxml.jackson.dataformat.xml.XmlMapper", classLoader);
|
||||||
Assertions.assertFalse(jackson2XmlPresent, "XmlMapper must not be present in classpath, please, exclude \"jackson-dataformat-xml\" dependency!");
|
assertThat(jackson2XmlPresent).isTrue();
|
||||||
//If this xml mapper will be present in classpath then we will get "Unsupported Media Type" in RestTemplate
|
|
||||||
|
RestTemplate restTemplate = new RestTemplate();
|
||||||
|
MockRestServiceServer mockServer = MockRestServiceServer.createServer(restTemplate);
|
||||||
|
mockServer.expect(requestTo("/test"))
|
||||||
|
.andExpect(request -> {
|
||||||
|
MockClientHttpRequest mockRequest = (MockClientHttpRequest) request;
|
||||||
|
byte[] body = mockRequest.getBodyAsBytes();
|
||||||
|
String requestBody = new String(body, StandardCharsets.UTF_8);
|
||||||
|
assertThat(requestBody).contains("{\"name\":\"test\",\"value\":1}");
|
||||||
|
})
|
||||||
|
.andRespond(withSuccess("{\"name\":\"test\",\"value\":1}", MediaType.APPLICATION_JSON));
|
||||||
|
|
||||||
|
TestObject requestObject = new TestObject("test", 1);
|
||||||
|
TestObject actualObject = restTemplate.postForObject("/test", requestObject, TestObject.class);
|
||||||
|
assertThat(actualObject).isEqualTo(requestObject);
|
||||||
|
mockServer.verify();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
record TestObject(String name, int value) {}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -42,7 +42,8 @@ public enum LimitedApi {
|
|||||||
TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false),
|
TRANSPORT_MESSAGES_PER_DEVICE("transport messages per device", false),
|
||||||
TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false),
|
TRANSPORT_MESSAGES_PER_GATEWAY("transport messages per gateway", false),
|
||||||
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false),
|
TRANSPORT_MESSAGES_PER_GATEWAY_DEVICE("transport messages per gateway device", false),
|
||||||
EMAILS("emails sending", true);
|
EMAILS("emails sending", true),
|
||||||
|
WS_SUBSCRIPTIONS("WS subscriptions", false);
|
||||||
|
|
||||||
private Function<DefaultTenantProfileConfiguration, String> configExtractor;
|
private Function<DefaultTenantProfileConfiguration, String> configExtractor;
|
||||||
@Getter
|
@Getter
|
||||||
|
|||||||
@ -30,4 +30,10 @@ public class TbRateLimitsException extends AbstractRateLimitException {
|
|||||||
super(entityType.name() + " rate limits reached!");
|
super(entityType.name() + " rate limits reached!");
|
||||||
this.entityType = entityType;
|
this.entityType = entityType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TbRateLimitsException(String message) {
|
||||||
|
super(message);
|
||||||
|
this.entityType = null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,44 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.common.util;
|
||||||
|
|
||||||
|
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.SOFT;
|
||||||
|
|
||||||
|
public class DeduplicationUtil {
|
||||||
|
|
||||||
|
private static final ConcurrentMap<Object, Long> cache = new ConcurrentReferenceHashMap<>(16, SOFT);
|
||||||
|
|
||||||
|
public static boolean alreadyProcessed(Object deduplicationKey, long deduplicationDuration) {
|
||||||
|
AtomicBoolean alreadyProcessed = new AtomicBoolean(false);
|
||||||
|
cache.compute(deduplicationKey, (key, lastProcessedTs) -> {
|
||||||
|
if (lastProcessedTs != null) {
|
||||||
|
long passed = System.currentTimeMillis() - lastProcessedTs;
|
||||||
|
if (passed <= deduplicationDuration) {
|
||||||
|
alreadyProcessed.set(true);
|
||||||
|
return lastProcessedTs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
});
|
||||||
|
return alreadyProcessed.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -15,11 +15,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.dao.sqlts.dictionary;
|
package org.thingsboard.server.dao.sqlts.dictionary;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.hibernate.exception.ConstraintViolationException;
|
import org.hibernate.exception.ConstraintViolationException;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.dao.DataIntegrityViolationException;
|
import org.springframework.dao.DataIntegrityViolationException;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.annotation.Propagation;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
|
import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
|
||||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey;
|
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey;
|
||||||
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
|
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
|
||||||
@ -34,14 +36,15 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@SqlDao
|
@SqlDao
|
||||||
|
@RequiredArgsConstructor
|
||||||
public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao {
|
public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao {
|
||||||
|
|
||||||
|
private final KeyDictionaryRepository keyDictionaryRepository;
|
||||||
|
|
||||||
private final ConcurrentMap<String, Integer> keyDictionaryMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, Integer> keyDictionaryMap = new ConcurrentHashMap<>();
|
||||||
protected static final ReentrantLock creationLock = new ReentrantLock();
|
private static final ReentrantLock creationLock = new ReentrantLock();
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private KeyDictionaryRepository keyDictionaryRepository;
|
|
||||||
|
|
||||||
|
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||||
@Override
|
@Override
|
||||||
public Integer getOrSaveKeyId(String strKey) {
|
public Integer getOrSaveKeyId(String strKey) {
|
||||||
Integer keyId = keyDictionaryMap.get(strKey);
|
Integer keyId = keyDictionaryMap.get(strKey);
|
||||||
|
|||||||
4
pom.xml
4
pom.xml
@ -2097,10 +2097,6 @@
|
|||||||
<groupId>io.jsonwebtoken</groupId>
|
<groupId>io.jsonwebtoken</groupId>
|
||||||
<artifactId>jjwt-impl</artifactId>
|
<artifactId>jjwt-impl</artifactId>
|
||||||
</exclusion>
|
</exclusion>
|
||||||
<exclusion>
|
|
||||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
|
||||||
<artifactId>jackson-dataformat-xml</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|||||||
@ -18,7 +18,7 @@ import { ResourcesService } from '@core/services/resources.service';
|
|||||||
import { Observable } from 'rxjs';
|
import { Observable } from 'rxjs';
|
||||||
import { ValueTypeData } from '@shared/models/constants';
|
import { ValueTypeData } from '@shared/models/constants';
|
||||||
|
|
||||||
export const noLeadTrailSpacesRegex = /^(?! )[\S\s]*(?<! )$/;
|
export const noLeadTrailSpacesRegex = /^\S+(?: \S+)*$/;
|
||||||
|
|
||||||
export enum StorageTypes {
|
export enum StorageTypes {
|
||||||
MEMORY = 'memory',
|
MEMORY = 'memory',
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user