Resolve merge conflicts

This commit is contained in:
ViacheslavKlimov 2023-03-06 10:45:32 +02:00
parent 069ef4de5b
commit a0796de0e4
4 changed files with 27 additions and 111 deletions

View File

@ -85,13 +85,8 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
@Value("${server.ws.ping_timeout:30000}") @Value("${server.ws.ping_timeout:30000}")
private long pingTimeout; private long pingTimeout;
<<<<<<< HEAD private final ConcurrentMap<String, WebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
private ConcurrentMap<String, WebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
private ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>();
=======
private final ConcurrentMap<String, TelemetryWebSocketSessionRef> blacklistedSessions = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>(); private final ConcurrentMap<String, TbRateLimits> perSessionUpdateLimits = new ConcurrentHashMap<>();
>>>>>>> upstream/develop/3.5
private final ConcurrentMap<TenantId, Set<String>> tenantSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap<TenantId, Set<String>> tenantSessionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<CustomerId, Set<String>> customerSessionsMap = new ConcurrentHashMap<>(); private final ConcurrentMap<CustomerId, Set<String>> customerSessionsMap = new ConcurrentHashMap<>();

View File

@ -218,7 +218,6 @@ public class DefaultWebSocketService implements WebSocketService {
} }
try { try {
<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java
switch (sessionRef.getSessionType()) { switch (sessionRef.getSessionType()) {
case TELEMETRY: case TELEMETRY:
processTelemetryCmds(sessionRef, msg); processTelemetryCmds(sessionRef, msg);
@ -226,45 +225,6 @@ public class DefaultWebSocketService implements WebSocketService {
case NOTIFICATIONS: case NOTIFICATIONS:
processNotificationCmds(sessionRef, msg); processNotificationCmds(sessionRef, msg);
break; break;
=======
TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.OBJECT_MAPPER.readValue(msg, TelemetryPluginCmdsWrapper.class);
if (cmdsWrapper != null) {
if (cmdsWrapper.getAttrSubCmds() != null) {
cmdsWrapper.getAttrSubCmds().forEach(cmd -> {
if (processSubscription(sessionRef, cmd)) {
handleWsAttributesSubscriptionCmd(sessionRef, cmd);
}
});
}
if (cmdsWrapper.getTsSubCmds() != null) {
cmdsWrapper.getTsSubCmds().forEach(cmd -> {
if (processSubscription(sessionRef, cmd)) {
handleWsTimeseriesSubscriptionCmd(sessionRef, cmd);
}
});
}
if (cmdsWrapper.getHistoryCmds() != null) {
cmdsWrapper.getHistoryCmds().forEach(cmd -> handleWsHistoryCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityDataCmds() != null) {
cmdsWrapper.getEntityDataCmds().forEach(cmd -> handleWsEntityDataCmd(sessionRef, cmd));
}
if (cmdsWrapper.getAlarmDataCmds() != null) {
cmdsWrapper.getAlarmDataCmds().forEach(cmd -> handleWsAlarmDataCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityCountCmds() != null) {
cmdsWrapper.getEntityCountCmds().forEach(cmd -> handleWsEntityCountCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityDataUnsubscribeCmds() != null) {
cmdsWrapper.getEntityDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
if (cmdsWrapper.getAlarmDataUnsubscribeCmds() != null) {
cmdsWrapper.getAlarmDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
if (cmdsWrapper.getEntityCountUnsubscribeCmds() != null) {
cmdsWrapper.getEntityCountUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd));
}
>>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
} }
} catch (IOException e) { } catch (IOException e) {
log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e);
@ -272,9 +232,8 @@ public class DefaultWebSocketService implements WebSocketService {
} }
} }
private void processTelemetryCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException { private void processTelemetryCmds(WebSocketSessionRef sessionRef, String msg) throws JsonProcessingException {
TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class); TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, TelemetryPluginCmdsWrapper.class);
if (cmdsWrapper == null) { if (cmdsWrapper == null) {
return; return;
} }
@ -287,7 +246,7 @@ public class DefaultWebSocketService implements WebSocketService {
} }
private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException { private void processNotificationCmds(WebSocketSessionRef sessionRef, String msg) throws IOException {
NotificationCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, NotificationCmdsWrapper.class); NotificationCmdsWrapper cmdsWrapper = JacksonUtil.fromString(msg, NotificationCmdsWrapper.class);
for (WsCmdHandler<NotificationCmdsWrapper, ? extends WsCmd> cmdHandler : notificationCmdsHandlers) { for (WsCmdHandler<NotificationCmdsWrapper, ? extends WsCmd> cmdHandler : notificationCmdsHandlers) {
WsCmd cmd = cmdHandler.extractCmd(cmdsWrapper); WsCmd cmd = cmdHandler.extractCmd(cmdsWrapper);
if (cmd != null) { if (cmd != null) {
@ -526,18 +485,14 @@ public class DefaultWebSocketService implements WebSocketService {
.allKeys(false) .allKeys(false)
.keyStates(subState) .keyStates(subState)
.scope(scope) .scope(scope)
<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java .updateProcessor((subscription, update) -> {
.updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update))
=======
.updateConsumer((sessionId, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendWsMsg(sessionId, update); sendWsMsg(subscription.getSessionId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
}) })
>>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
.build(); .build();
subLock.lock(); subLock.lock();
@ -643,15 +598,10 @@ public class DefaultWebSocketService implements WebSocketService {
.entityId(entityId) .entityId(entityId)
.allKeys(true) .allKeys(true)
.keyStates(subState) .keyStates(subState)
<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java .updateProcessor((subscription, update) -> {
.updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update))
.scope(scope).build();
oldSubService.addSubscription(sub);
=======
.updateConsumer((sessionId, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendWsMsg(sessionId, update); sendWsMsg(subscription.getSessionId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
@ -666,7 +616,6 @@ public class DefaultWebSocketService implements WebSocketService {
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
>>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
} }
@Override @Override
@ -749,20 +698,17 @@ public class DefaultWebSocketService implements WebSocketService {
.subscriptionId(cmd.getCmdId()) .subscriptionId(cmd.getCmdId())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java .updateProcessor((subscription, update) -> {
.updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update))
=======
.updateConsumer((sessionId, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendWsMsg(sessionId, update); sendWsMsg(subscription.getSessionId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
}) })
>>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
.allKeys(true) .allKeys(true)
.keyStates(subState).build(); .keyStates(subState)
.build();
subLock.lock(); subLock.lock();
try { try {
@ -805,20 +751,17 @@ public class DefaultWebSocketService implements WebSocketService {
.subscriptionId(cmd.getCmdId()) .subscriptionId(cmd.getCmdId())
.tenantId(sessionRef.getSecurityCtx().getTenantId()) .tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId) .entityId(entityId)
<<<<<<< HEAD:application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java .updateProcessor((subscription, update) -> {
.updateProcessor((subscription, update) -> sendWsMsg(subscription.getSessionId(), update))
=======
.updateConsumer((sessionId, update) -> {
subLock.lock(); subLock.lock();
try { try {
sendWsMsg(sessionId, update); sendWsMsg(subscription.getSessionId(), update);
} finally { } finally {
subLock.unlock(); subLock.unlock();
} }
}) })
>>>>>>> upstream/develop/3.5:application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java
.allKeys(false) .allKeys(false)
.keyStates(subState).build(); .keyStates(subState)
.build();
subLock.lock(); subLock.lock();
try{ try{

View File

@ -52,12 +52,8 @@ public abstract class AbstractControllerTest extends AbstractNotifyEntityTest {
@LocalServerPort @LocalServerPort
protected int wsPort; protected int wsPort;
<<<<<<< HEAD
private TbTestWebSocketClient wsClient; // lazy
private TbTestWebSocketClient anotherWsClient; // lazy
=======
private volatile TbTestWebSocketClient wsClient; // lazy private volatile TbTestWebSocketClient wsClient; // lazy
>>>>>>> upstream/develop/3.5 private volatile TbTestWebSocketClient anotherWsClient; // lazy
public TbTestWebSocketClient getWsClient() { public TbTestWebSocketClient getWsClient() {
if (wsClient == null) { if (wsClient == null) {

View File

@ -48,11 +48,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class TbTestWebSocketClient extends WebSocketClient { public class TbTestWebSocketClient extends WebSocketClient {
<<<<<<< HEAD
@Getter
=======
private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30); private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
>>>>>>> upstream/develop/3.5
@Getter
private volatile String lastMsg; private volatile String lastMsg;
private volatile CountDownLatch reply; private volatile CountDownLatch reply;
private volatile CountDownLatch update; private volatile CountDownLatch update;
@ -100,11 +98,7 @@ public class TbTestWebSocketClient extends WebSocketClient {
@Override @Override
public void send(String text) throws NotYetConnectedException { public void send(String text) throws NotYetConnectedException {
<<<<<<< HEAD
log.info("SENDING: {}", text);
=======
log.debug("send [{}]", text); log.debug("send [{}]", text);
>>>>>>> upstream/develop/3.5
reply = new CountDownLatch(1); reply = new CountDownLatch(1);
super.send(text); super.send(text);
} }
@ -122,12 +116,11 @@ public class TbTestWebSocketClient extends WebSocketClient {
} }
public String waitForUpdate() { public String waitForUpdate() {
<<<<<<< HEAD
return waitForUpdate(false); return waitForUpdate(false);
} }
public String waitForUpdate(boolean throwExceptionOnTimeout) { public String waitForUpdate(boolean throwExceptionOnTimeout) {
return waitForUpdate(TimeUnit.SECONDS.toMillis(3), throwExceptionOnTimeout); return waitForUpdate(TIMEOUT, throwExceptionOnTimeout);
} }
public String waitForUpdate(long ms) { public String waitForUpdate(long ms) {
@ -135,19 +128,12 @@ public class TbTestWebSocketClient extends WebSocketClient {
} }
public String waitForUpdate(long ms, boolean throwExceptionOnTimeout) { public String waitForUpdate(long ms, boolean throwExceptionOnTimeout) {
log.debug("waitForUpdate [{}]", ms);
try { try {
if (update.await(ms, TimeUnit.MILLISECONDS)) { if (update.await(ms, TimeUnit.MILLISECONDS)) {
return lastMsg; return lastMsg;
======= } else {
return waitForUpdate(TIMEOUT);
}
public String waitForUpdate(long ms) {
log.debug("waitForUpdate [{}]", ms);
try {
if (!update.await(ms, TimeUnit.MILLISECONDS)) {
log.warn("Failed to await update (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace")); log.warn("Failed to await update (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace"));
>>>>>>> upstream/develop/3.5
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("Failed to await update", e); log.warn("Failed to await update", e);
@ -160,30 +146,26 @@ public class TbTestWebSocketClient extends WebSocketClient {
} }
public String waitForReply() { public String waitForReply() {
<<<<<<< HEAD
return waitForReply(false); return waitForReply(false);
} }
public String waitForReply(boolean throwExceptionOnTimeout) { public String waitForReply(boolean throwExceptionOnTimeout) {
try { return waitForReply(TIMEOUT, throwExceptionOnTimeout);
if (reply.await(3, TimeUnit.SECONDS)) {
return lastMsg;
=======
return waitForReply(TIMEOUT);
} }
public String waitForReply(long ms) { public String waitForReply(long ms, boolean throwExceptionOnTimeout) {
log.debug("waitForReply [{}]", ms); log.debug("waitForReply [{}]", ms);
try { try {
if (!reply.await(ms, TimeUnit.MILLISECONDS)) { if (reply.await(ms, TimeUnit.MILLISECONDS)) {
return lastMsg;
} else {
log.warn("Failed to await reply (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace")); log.warn("Failed to await reply (waiting time [{}]ms elapsed)", ms, new RuntimeException("stacktrace"));
>>>>>>> upstream/develop/3.5
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("Failed to await reply", e); log.warn("Failed to await reply", e);
} }
if (throwExceptionOnTimeout) { if (throwExceptionOnTimeout) {
throw new AssertionError("Waited for reply for 3 seconds but none arrived"); throw new AssertionError("Waited for reply for " + ms + " ms but none arrived");
} else { } else {
return null; return null;
} }