Merge branch 'hotfix/3.7' of github.com:thingsboard/thingsboard into merge-with-hotfix-3.7
This commit is contained in:
commit
235e877337
@ -117,7 +117,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
}
|
||||
callback.onSuccess();
|
||||
if (event.hasTsOrAttrSub()) {
|
||||
sendSubEventCallback(serviceId, entityId, event.getSeqNumber());
|
||||
sendSubEventCallback(tenantId, serviceId, entityId, event.getSeqNumber());
|
||||
}
|
||||
} else {
|
||||
log.warn("[{}][{}][{}] Event belongs to external partition. Probably re-balancing is in progress. Topic: {}"
|
||||
@ -141,12 +141,12 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
}
|
||||
}
|
||||
|
||||
private void sendSubEventCallback(String targetId, EntityId entityId, int seqNumber) {
|
||||
private void sendSubEventCallback(TenantId tenantId, String targetId, EntityId entityId, int seqNumber) {
|
||||
var update = getEntityUpdatesInfo(entityId);
|
||||
if (serviceId.equals(targetId)) {
|
||||
localSubscriptionService.onSubEventCallback(entityId, seqNumber, update, TbCallback.EMPTY);
|
||||
localSubscriptionService.onSubEventCallback(tenantId, entityId, seqNumber, update, TbCallback.EMPTY);
|
||||
} else {
|
||||
sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(entityId.getId(), seqNumber, update));
|
||||
sendCoreNotification(targetId, entityId, TbSubscriptionUtils.toProto(tenantId, entityId.getId(), seqNumber, update));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||
import org.thingsboard.common.util.DeduplicationUtil;
|
||||
import org.thingsboard.common.util.DonAsynchron;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
@ -121,12 +122,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
private String serviceId;
|
||||
private ExecutorService subscriptionUpdateExecutor;
|
||||
|
||||
private final Lock subsLock = new ReentrantLock();
|
||||
private final ConcurrentReferenceHashMap<TenantId, Lock> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.SOFT);
|
||||
|
||||
@PostConstruct
|
||||
public void initExecutor() {
|
||||
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
|
||||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback"));
|
||||
tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId
|
||||
serviceId = serviceInfoProvider.getServiceId();
|
||||
staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup"));
|
||||
staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS);
|
||||
@ -198,6 +199,10 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
});
|
||||
}
|
||||
|
||||
Lock getSubsLock(TenantId tenantId) {
|
||||
return locks.computeIfAbsent(tenantId, x -> new ReentrantLock());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSubscription(TbSubscription<?> subscription, WebSocketSessionRef sessionRef) {
|
||||
TenantId tenantId = subscription.getTenantId();
|
||||
@ -213,6 +218,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
|
||||
log.debug("[{}][{}] Register subscription: {}", tenantId, entityId, subscription);
|
||||
SubscriptionModificationResult result;
|
||||
final Lock subsLock = getSubsLock(tenantId);
|
||||
subsLock.lock();
|
||||
try {
|
||||
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
|
||||
@ -228,19 +234,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
|
||||
@Override
|
||||
public void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback) {
|
||||
TenantId tenantId;
|
||||
if (subEventCallback.getTenantIdMSB() == 0 && subEventCallback.getTenantIdLSB() == 0) {
|
||||
tenantId = TenantId.SYS_TENANT_ID; //TODO: remove after release
|
||||
} else {
|
||||
tenantId = TenantId.fromUUID(new UUID(subEventCallback.getTenantIdMSB(), subEventCallback.getTenantIdLSB()));
|
||||
}
|
||||
UUID entityId = new UUID(subEventCallback.getEntityIdMSB(), subEventCallback.getEntityIdLSB());
|
||||
onSubEventCallback(entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
|
||||
onSubEventCallback(tenantId, entityId, subEventCallback.getSeqNumber(), new TbEntityUpdatesInfo(subEventCallback.getAttributesUpdateTs(), subEventCallback.getTimeSeriesUpdateTs()), callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
||||
onSubEventCallback(entityId.getId(), seqNumber, entityUpdatesInfo, callback);
|
||||
public void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
||||
onSubEventCallback(tenantId, entityId.getId(), seqNumber, entityUpdatesInfo, callback);
|
||||
}
|
||||
|
||||
public void onSubEventCallback(UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
||||
log.debug("[{}][{}] Processing sub event callback: {}.", entityId, seqNumber, entityUpdatesInfo);
|
||||
private void onSubEventCallback(TenantId tenantId, UUID entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback callback) {
|
||||
log.debug("[{}][{}][{}] Processing sub event callback: {}.", tenantId, entityId, seqNumber, entityUpdatesInfo);
|
||||
entityUpdates.put(entityId, entityUpdatesInfo);
|
||||
Set<TbSubscription<?>> pendingSubs = null;
|
||||
Lock subsLock = getSubsLock(tenantId);
|
||||
subsLock.lock();
|
||||
try {
|
||||
TbEntityLocalSubsInfo entitySubs = subscriptionsByEntityId.get(entityId);
|
||||
@ -257,24 +270,26 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelSubscription(String sessionId, int subscriptionId) {
|
||||
log.debug("[{}][{}] Going to remove subscription.", sessionId, subscriptionId);
|
||||
public void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId) {
|
||||
log.debug("[{}][{}][{}] Going to remove subscription.", tenantId, sessionId, subscriptionId);
|
||||
SubscriptionModificationResult result = null;
|
||||
Lock subsLock = getSubsLock(tenantId);
|
||||
subsLock.lock();
|
||||
try {
|
||||
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.get(sessionId);
|
||||
if (sessionSubscriptions != null) {
|
||||
TbSubscription<?> subscription = sessionSubscriptions.remove(subscriptionId);
|
||||
if (subscription != null) {
|
||||
|
||||
if (sessionSubscriptions.isEmpty()) {
|
||||
subscriptionsBySessionId.remove(sessionId);
|
||||
}
|
||||
result = modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false);
|
||||
} else {
|
||||
log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId);
|
||||
log.debug("[{}][{}][{}] Subscription not found!", tenantId, sessionId, subscriptionId);
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] No session subscriptions found!", sessionId);
|
||||
log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
|
||||
}
|
||||
} finally {
|
||||
subsLock.unlock();
|
||||
@ -285,18 +300,19 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelAllSessionSubscriptions(String sessionId) {
|
||||
log.debug("[{}] Going to remove session subscriptions.", sessionId);
|
||||
public void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId) {
|
||||
log.debug("[{}][{}] Going to remove session subscriptions.", tenantId, sessionId);
|
||||
List<SubscriptionModificationResult> results = new ArrayList<>();
|
||||
Lock subsLock = getSubsLock(tenantId);
|
||||
subsLock.lock();
|
||||
try {
|
||||
Map<Integer, TbSubscription<?>> sessionSubscriptions = subscriptionsBySessionId.remove(sessionId);
|
||||
if (sessionSubscriptions != null) {
|
||||
for (TbSubscription<?> subscription : sessionSubscriptions.values()) {
|
||||
results.add(modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false));
|
||||
results.add(modifySubscription(tenantId, subscription.getEntityId(), subscription, false));
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] No session subscriptions found!", sessionId);
|
||||
log.debug("[{}][{}] No session subscriptions found!", tenantId, sessionId);
|
||||
}
|
||||
} finally {
|
||||
subsLock.unlock();
|
||||
@ -604,7 +620,12 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
|
||||
}
|
||||
|
||||
private void cleanupStaleSessions() {
|
||||
subscriptionsBySessionId.keySet().forEach(webSocketService::cleanupIfStale);
|
||||
subscriptionsBySessionId.forEach((sessionId, subscriptions) ->
|
||||
subscriptions.values()
|
||||
.stream()
|
||||
.findAny()
|
||||
.ifPresent(subscription -> webSocketService.cleanupIfStale(subscription.getTenantId(), sessionId))
|
||||
);
|
||||
}
|
||||
|
||||
private void handleRateLimitError(TbSubscription<?> subscription, WebSocketSessionRef sessionRef, String message) {
|
||||
|
||||
@ -113,7 +113,7 @@ public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends
|
||||
public void clearEntitySubscriptions() {
|
||||
if (subToEntityIdMap != null) {
|
||||
for (Integer subId : subToEntityIdMap.keySet()) {
|
||||
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
|
||||
localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId);
|
||||
}
|
||||
subToEntityIdMap.clear();
|
||||
}
|
||||
|
||||
@ -303,7 +303,7 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
|
||||
protected void clearDynamicValueSubscriptions() {
|
||||
if (subToDynamicValueKeySet != null) {
|
||||
for (Integer subId : subToDynamicValueKeySet) {
|
||||
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId);
|
||||
localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId);
|
||||
}
|
||||
subToDynamicValueKeySet.clear();
|
||||
}
|
||||
|
||||
@ -341,7 +341,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
||||
long startTs = System.currentTimeMillis() - query.getPageLink().getTimeWindow();
|
||||
newSubsList.forEach(entity -> createAlarmSubscriptionForEntity(query.getPageLink(), startTs, entity));
|
||||
}
|
||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
|
||||
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
|
||||
}
|
||||
|
||||
|
||||
@ -225,7 +225,7 @@ public class TbEntityDataSubCtx extends TbAbstractDataSubCtx<EntityDataQuery> {
|
||||
}
|
||||
}
|
||||
}
|
||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getSessionId(), subId));
|
||||
subIdsToCancel.forEach(subId -> localSubscriptionService.cancelSubscription(getTenantId(), getSessionId(), subId));
|
||||
subsToAdd.forEach(subscription -> localSubscriptionService.addSubscription(subscription, sessionRef));
|
||||
sendWsMsg(new EntityDataUpdate(cmdId, data, null, maxEntitiesPerDataSubscription));
|
||||
}
|
||||
|
||||
@ -27,8 +27,6 @@ import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Information about the local websocket subscriptions.
|
||||
@ -42,8 +40,6 @@ public class TbEntityLocalSubsInfo {
|
||||
@Getter
|
||||
private final EntityId entityId;
|
||||
@Getter
|
||||
private final Lock lock = new ReentrantLock();
|
||||
@Getter
|
||||
private final Set<TbSubscription<?>> subs = ConcurrentHashMap.newKeySet();
|
||||
private volatile TbSubscriptionsInfo state = new TbSubscriptionsInfo();
|
||||
|
||||
|
||||
@ -34,11 +34,11 @@ public interface TbLocalSubscriptionService {
|
||||
|
||||
void onSubEventCallback(TransportProtos.TbEntitySubEventCallbackProto subEventCallback, TbCallback callback);
|
||||
|
||||
void onSubEventCallback(EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty);
|
||||
void onSubEventCallback(TenantId tenantId, EntityId entityId, int seqNumber, TbEntityUpdatesInfo entityUpdatesInfo, TbCallback empty);
|
||||
|
||||
void cancelSubscription(String sessionId, int subscriptionId);
|
||||
void cancelSubscription(TenantId tenantId, String sessionId, int subscriptionId);
|
||||
|
||||
void cancelAllSessionSubscriptions(String sessionId);
|
||||
void cancelAllSessionSubscriptions(TenantId tenantId, String sessionId);
|
||||
|
||||
void onTimeSeriesUpdate(TransportProtos.TbSubUpdateProto tsUpdate, TbCallback callback);
|
||||
|
||||
|
||||
@ -82,10 +82,12 @@ public class TbSubscriptionUtils {
|
||||
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder).build();
|
||||
}
|
||||
|
||||
public static ToCoreNotificationMsg toProto(UUID id, int seqNumber, TbEntityUpdatesInfo update) {
|
||||
public static ToCoreNotificationMsg toProto(TenantId tenantId, UUID id, int seqNumber, TbEntityUpdatesInfo update) {
|
||||
TransportProtos.TbEntitySubEventCallbackProto.Builder updateProto = TransportProtos.TbEntitySubEventCallbackProto.newBuilder()
|
||||
.setEntityIdMSB(id.getMostSignificantBits())
|
||||
.setEntityIdLSB(id.getLeastSignificantBits())
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setSeqNumber(seqNumber)
|
||||
.setAttributesUpdateTs(update.attributesUpdateTs)
|
||||
.setTimeSeriesUpdateTs(update.timeSeriesUpdateTs);
|
||||
|
||||
@ -193,17 +193,18 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
@Override
|
||||
public void handleSessionEvent(WebSocketSessionRef sessionRef, SessionEvent event) {
|
||||
String sessionId = sessionRef.getSessionId();
|
||||
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
|
||||
log.debug(PROCESSING_MSG, sessionId, event);
|
||||
switch (event.getEventType()) {
|
||||
case ESTABLISHED:
|
||||
wsSessionsMap.put(sessionId, new WsSessionMetaData(sessionRef));
|
||||
break;
|
||||
case ERROR:
|
||||
log.debug("[{}] Unknown websocket session error: ", sessionId,
|
||||
log.debug("[{}][{}] Unknown websocket session error: ", tenantId, sessionId,
|
||||
event.getError().orElse(new RuntimeException("No error specified")));
|
||||
break;
|
||||
case CLOSED:
|
||||
cleanupSessionById(sessionId);
|
||||
cleanupSessionById(tenantId, sessionId);
|
||||
processSessionClose(sessionRef);
|
||||
break;
|
||||
}
|
||||
@ -297,10 +298,10 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupIfStale(String sessionId) {
|
||||
public void cleanupIfStale(TenantId tenantId, String sessionId) {
|
||||
if (!msgEndpoint.isOpen(sessionId)) {
|
||||
log.info("[{}] Cleaning up stale session ", sessionId);
|
||||
cleanupSessionById(sessionId);
|
||||
cleanupSessionById(tenantId, sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -754,22 +755,23 @@ public class DefaultWebSocketService implements WebSocketService {
|
||||
}
|
||||
|
||||
private void unsubscribe(WebSocketSessionRef sessionRef, SubscriptionCmd cmd, String sessionId) {
|
||||
TenantId tenantId = sessionRef.getSecurityCtx().getTenantId();
|
||||
if (cmd.getEntityId() == null || cmd.getEntityId().isEmpty()) {
|
||||
log.warn("[{}][{}] Cleanup session due to empty entity id.", sessionId, cmd.getCmdId());
|
||||
cleanupSessionById(sessionId);
|
||||
log.warn("[{}][{}][{}] Cleanup session due to empty entity id.", tenantId, sessionId, cmd.getCmdId());
|
||||
cleanupSessionById(tenantId, sessionId);
|
||||
} else {
|
||||
Integer subId = sessionCmdMap.getOrDefault(sessionId, Collections.emptyMap()).remove(cmd.getCmdId());
|
||||
if (subId == null) {
|
||||
log.trace("[{}][{}] Failed to lookup subscription id mapping", sessionId, cmd.getCmdId());
|
||||
log.trace("[{}][{}][{}] Failed to lookup subscription id mapping", tenantId, sessionId, cmd.getCmdId());
|
||||
subId = cmd.getCmdId();
|
||||
}
|
||||
oldSubService.cancelSubscription(sessionId, subId);
|
||||
oldSubService.cancelSubscription(tenantId, sessionId, subId);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupSessionById(String sessionId) {
|
||||
private void cleanupSessionById(TenantId tenantId, String sessionId) {
|
||||
wsSessionsMap.remove(sessionId);
|
||||
oldSubService.cancelAllSessionSubscriptions(sessionId);
|
||||
oldSubService.cancelAllSessionSubscriptions(tenantId, sessionId);
|
||||
sessionCmdMap.remove(sessionId);
|
||||
entityDataSubService.cancelAllSessionSubscriptions(sessionId);
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.service.ws;
|
||||
|
||||
import org.springframework.web.socket.CloseStatus;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
|
||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdate;
|
||||
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
|
||||
@ -37,6 +38,6 @@ public interface WebSocketService {
|
||||
|
||||
void close(String sessionId, CloseStatus status);
|
||||
|
||||
void cleanupIfStale(String sessionId);
|
||||
void cleanupIfStale(TenantId tenantId, String sessionId);
|
||||
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.ws;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
@ -39,6 +40,10 @@ public class WebSocketSessionRef {
|
||||
private final WebSocketSessionType sessionType;
|
||||
private final AtomicInteger sessionSubIdSeq = new AtomicInteger();
|
||||
|
||||
public TenantId getTenantId() {
|
||||
return securityCtx != null ? securityCtx.getTenantId() : TenantId.SYS_TENANT_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
||||
@ -249,7 +249,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
|
||||
|
||||
@Override
|
||||
public void handleUnsubCmd(WebSocketSessionRef sessionRef, UnsubscribeCmd cmd) {
|
||||
localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), cmd.getCmdId());
|
||||
localSubscriptionService.cancelSubscription(sessionRef.getTenantId(), sessionRef.getSessionId(), cmd.getCmdId());
|
||||
}
|
||||
|
||||
private void sendUpdate(String sessionId, CmdUpdate update) {
|
||||
|
||||
@ -834,6 +834,8 @@ message TbEntitySubEventCallbackProto {
|
||||
int32 seqNumber = 3;
|
||||
int64 attributesUpdateTs = 4;
|
||||
int64 timeSeriesUpdateTs = 5;
|
||||
int64 tenantIdMSB = 6;
|
||||
int64 tenantIdLSB = 7;
|
||||
}
|
||||
|
||||
message TsValueProto {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user