Merge pull request #12562 from thingsboard/rc

rc
This commit is contained in:
Viacheslav Klimov 2025-01-31 12:36:24 +02:00 committed by GitHub
commit 85d2867282
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 308 additions and 1 deletions

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jmx.export.MBeanExporter;
import org.thingsboard.server.common.transport.service.DefaultTransportService;
import org.thingsboard.server.queue.util.TbTransportComponent;
import java.util.HashMap;
import java.util.Map;
@Configuration
@TbTransportComponent
@RequiredArgsConstructor
public class DefaultTransportMBeanConfiguration {
private final DefaultTransportService transportService;
@Bean
public HashMapObserver hashMapObserver() {
return new HashMapObserver(transportService.sessions);
}
@Bean
public MBeanExporter mBeanExporter() {
MBeanExporter exporter = new MBeanExporter();
Map<String, Object> beans = new HashMap<>();
beans.put("org.thingsboard:type=TransportSessionMapObserver", hashMapObserver());
exporter.setBeans(beans);
return exporter;
}
}

View File

@ -0,0 +1,188 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import org.thingsboard.server.transport.mqtt.session.GatewayDeviceSessionContext;
import java.util.Map;
import java.util.UUID;
@RequiredArgsConstructor
@Slf4j
public class HashMapObserver implements HashMapObserverMBean {
private final Map<UUID, SessionMetaData> map;
@Override
public int getSize() {
return map.size();
}
@Override
public long getGatewayCount(String unused) {
return map.values().stream().filter(v-> v.getSessionInfo() != null && v.getSessionInfo().getIsGateway()).count();
}
@Override
public long getNonGatewayCount(String unused) {
return map.values().stream().filter(v-> v.getSessionInfo() != null && !v.getSessionInfo().getIsGateway()).count();
}
@Override
public String getSessionByUUID(String uuid) {
return String.valueOf(map.get(UUID.fromString(uuid)));
}
void addContent(Object entry, int count, StringBuilder content) {
String lineContent = String.valueOf(entry).replaceAll(System.lineSeparator()," ");
log.info("{} content = {}", count, lineContent);
content.append(lineContent).append(System.lineSeparator());
}
@Override
public String getAllSessions(String unused) {
log.info("getAllSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
addContent(entry, ++count, content);
}
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
public String getSubscribedSessions(String unused) {
log.info("getSubscribedSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
boolean hasSubscription = entry.getValue().isSubscribedToRPC() || entry.getValue().isSubscribedToAttributes();
if (hasSubscription) {
addContent(entry, ++count, content);
}
}
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
public String getNonActiveSessions(String unused) {
log.info("getNonActiveSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
SessionMetaData sessionMetaData = entry.getValue();
if (sessionMetaData.getListener() instanceof MqttTransportHandler) {
MqttTransportHandler listener = (MqttTransportHandler) sessionMetaData.getListener();
if (!listener.deviceSessionCtx.getChannel().channel().isActive()) {
addContent(entry, ++count, content);
}
}
}
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
public String getActiveSessions(String unused) {
log.info("getActiveSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
SessionMetaData sessionMetaData = entry.getValue();
if (sessionMetaData.getListener() instanceof MqttTransportHandler) {
MqttTransportHandler listener = (MqttTransportHandler) sessionMetaData.getListener();
if (listener.deviceSessionCtx.getChannel().channel().isActive()) {
addContent(entry, ++count, content);
}
} else {
addContent(sessionMetaData.getListener().getClass(), ++count, content);
}
}
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
public String getGatewayDeviceSessionContextConnectedSessions(String unused) {
log.info("getGatewayDeviceSessionContextConnectedSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
SessionMetaData sessionMetaData = entry.getValue();
if (sessionMetaData.getListener() instanceof GatewayDeviceSessionContext) {
GatewayDeviceSessionContext listener = (GatewayDeviceSessionContext) sessionMetaData.getListener();
if (listener.isConnected()) {
addContent(entry, ++count, content);
}
}
}
addContent(count, count, content);
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
public String getDeviceAwareSessionContextNotConnectedSessions(String unused) {
log.info("getDeviceAwareSessionContextNotConnectedSessions()");
StringBuilder content = new StringBuilder();
try {
int count = 0;
for (Map.Entry<UUID, SessionMetaData> entry : map.entrySet()) {
SessionMetaData sessionMetaData = entry.getValue();
if (sessionMetaData.getListener() instanceof DeviceAwareSessionContext) {
DeviceAwareSessionContext listener = (DeviceAwareSessionContext) sessionMetaData.getListener();
if (!listener.isConnected()) {
addContent(entry, ++count, content);
}
}
}
addContent(count, count, content);
return content.toString();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
}
// 4a7d85c9-eb4b-4fbc-8f6c-deb158cc9ac7=SessionMetaData(sessionInfo=nodeId: "bestia.local" sessionIdMSB: 5367593433178001340 sessionIdLSB: -8111863975520724281 tenantIdMSB: -1954197196874116625 tenantIdLSB: -7022192637061768255 deviceIdMSB: -5222516332438875665 deviceIdLSB: -7338416368958642691 deviceName: "Demo Device" deviceType: "default" gwSessionIdMSB: 3730140660294699909 gwSessionIdLSB: -7918622346767288875 deviceProfileIdMSB: -1952135612572036625 deviceProfileIdLSB: -7022192637061768255 customerIdMSB: 1405474927960789426 customerIdLSB: -9187201950435737472 gatewayIdMSB: 164837549830312431 gatewayIdLSB: -7338416368958642691 , sessionType=ASYNC, listener=GatewayDeviceSessionContext(super=AbstractGatewayDeviceSessionContext(super=MqttDeviceAwareSessionContext(super=DeviceAwareSessionContext(sessionId=4a7d85c9-eb4b-4fbc-8f6c-deb158cc9ac7, deviceId=b785e510-d34e-11ef-9a28-b5316a4ee5fd, tenantId=e4e14c00-d341-11ef-9e8c-29007391dbc1, deviceInfo=TransportDeviceInfo(tenantId=e4e14c00-d341-11ef-9e8c-29007391dbc1, customerId=13814000-1dd2-11b2-8080-808080808080, deviceProfileId=e4e89f00-d341-11ef-9e8c-29007391dbc1, deviceId=b785e510-d34e-11ef-9a28-b5316a4ee5fd, deviceName=Demo Device, deviceType=default, powerMode=null, additionalInfo={"lastConnectedGateway":"02499ee0-d348-11ef-9a28-b5316a4ee5fd"}, edrxCycle=null, psmActivityTimer=null, pagingTransmissionWindow=null, gateway=false), deviceProfile=DeviceProfile(tenantId=e4e14c00-d341-11ef-9e8c-29007391dbc1, name=default, description=Default device profile, isDefault=true, type=DEFAULT, transportType=DEFAULT, provisionType=DISABLED, defaultRuleChainId=null, defaultDashboardId=null, defaultQueueName=null, profileData=DeviceProfileData(configuration=DefaultDeviceProfileConfiguration(), transportConfiguration=DefaultDeviceProfileTransportConfiguration(), provisionConfiguration=DisabledDeviceProfileProvisionConfiguration(provisionDeviceSecret=null), alarms=null), provisionDeviceKey=null, firmwareId=null, softwareId=null, defaultEdgeRuleChainId=null, externalId=null, version=1), sessionInfo=nodeId: "bestia.local" sessionIdMSB: 5367593433178001340 sessionIdLSB: -8111863975520724281 tenantIdMSB: -1954197196874116625 tenantIdLSB: -7022192637061768255 deviceIdMSB: -5222516332438875665 deviceIdLSB: -7338416368958642691 deviceName: "Demo Device" deviceType: "default" gwSessionIdMSB: 3730140660294699909 gwSessionIdLSB: -7918622346767288875 deviceProfileIdMSB: -1952135612572036625 deviceProfileIdLSB: -7022192637061768255 customerIdMSB: 1405474927960789426 customerIdLSB: -9187201950435737472 gatewayIdMSB: 164837549830312431 gatewayIdLSB: -7338416368958642691 , connected=true), mqttQoSMap={org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@697a7658=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@d44c5be8=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@e3a8acd0=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@e3d2681a=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@a65b8616=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@133a8264=1, org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher@12e92606=1}), parent=org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler@3d4885c2, transportService=org.thingsboard.server.common.transport.service.DefaultTransportService@77c16f87)), scheduledFuture=null, subscribedToAttributes=true, subscribedToRPC=true, overwriteActivityTime=false)
//1

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.mqtt;
public interface HashMapObserverMBean {
int getSize();
long getGatewayCount(String unused);
long getNonGatewayCount(String unused);
String getSessionByUUID(String key);
String getAllSessions(String key);
String getSubscribedSessions(String unused);
String getNonActiveSessions(String unused);
String getActiveSessions(String unused);
String getGatewayDeviceSessionContextConnectedSessions(String unused);
String getDeviceAwareSessionContextNotConnectedSessions(String unused);
}

View File

@ -17,6 +17,8 @@ package org.thingsboard.server.transport.mqtt.session;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttMessage;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceId;
@ -34,9 +36,11 @@ import java.util.concurrent.ConcurrentMap;
/**
* Created by ashvayka on 19.01.17.
*/
@ToString(callSuper = true)
@Slf4j
public abstract class AbstractGatewayDeviceSessionContext<T extends AbstractGatewaySessionHandler> extends MqttDeviceAwareSessionContext implements SessionMsgListener {
@Getter
protected final T parent;
private final TransportService transportService;

View File

@ -16,8 +16,10 @@
package org.thingsboard.server.transport.mqtt.session;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@ -115,6 +117,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private final ConcurrentMap<String, T> devices;
private final ConcurrentMap<String, ListenableFuture<T>> deviceFutures;
protected final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
@Getter
protected final ChannelHandlerContext channel;
protected final DeviceSessionCtx deviceSessionCtx;
protected final GatewayMetricsService gatewayMetricsService;
@ -124,6 +127,7 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
private boolean overwriteDevicesActivity = false;
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId, boolean overwriteDevicesActivity) {
log.debug("[{}] Gateway connect [{}] session [{}]", deviceSessionCtx.getTenantId(), deviceSessionCtx.getDeviceId(), sessionId);
this.context = deviceSessionCtx.getContext();
this.transportService = context.getTransportService();
this.deviceSessionCtx = deviceSessionCtx;
@ -195,7 +199,27 @@ public abstract class AbstractGatewaySessionHandler<T extends AbstractGatewayDev
}
public void onDevicesDisconnect() {
devices.forEach(this::deregisterSession);
log.debug("[{}] Gateway disconnect [{}]", gateway.getTenantId(), gateway.getDeviceId());
try {
deviceFutures.forEach((name, future) -> {
Futures.addCallback(future, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
log.debug("[{}] Gateway disconnect [{}] device deregister callback [{}]", gateway.getTenantId(), gateway.getDeviceId(), name);
deregisterSession(name, result);
}
@Override
public void onFailure(Throwable t) {
}
}, MoreExecutors.directExecutor());
});
devices.forEach(this::deregisterSession);
} catch (Exception e) {
log.error("Gateway disconnect failure", e);
}
}
public void onDeviceDeleted(String deviceName) {

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.transport.mqtt.session;
import lombok.ToString;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.auth.TransportDeviceInfo;
@ -24,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
/**
* Created by nickAS21 on 26.12.22
*/
@ToString(callSuper = true)
public class GatewayDeviceSessionContext extends AbstractGatewayDeviceSessionContext<GatewaySessionHandler> {
public GatewayDeviceSessionContext(GatewaySessionHandler parent,

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.transport.mqtt.session;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.ToString;
import org.thingsboard.server.common.transport.session.DeviceAwareSessionContext;
import java.util.List;
@ -27,6 +28,7 @@ import java.util.stream.Collectors;
/**
* Created by ashvayka on 30.08.18.
*/
@ToString(callSuper = true)
public abstract class MqttDeviceAwareSessionContext extends DeviceAwareSessionContext {
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;