sparkplug: Rename Gateway to abstract
This commit is contained in:
parent
4cca1b6e57
commit
7babba1db4
@ -71,7 +71,7 @@ import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto;
|
||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||
import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
|
||||
import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
|
||||
import org.thingsboard.server.transport.mqtt.session.GatewaySessionHandler;
|
||||
import org.thingsboard.server.transport.mqtt.session.AbstractGatewaySessionHandler;
|
||||
import org.thingsboard.server.transport.mqtt.session.MqttTopicMatcher;
|
||||
import org.thingsboard.server.transport.mqtt.session.SparkplugNodeSessionHandler;
|
||||
import org.thingsboard.server.transport.mqtt.util.sparkplug.SparkplugTopic;
|
||||
@ -131,7 +131,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
|
||||
final DeviceSessionCtx deviceSessionCtx;
|
||||
volatile InetSocketAddress address;
|
||||
volatile GatewaySessionHandler gatewaySessionHandler;
|
||||
volatile AbstractGatewaySessionHandler gatewaySessionHandler;
|
||||
volatile SparkplugNodeSessionHandler sparkplugSessionHandler;
|
||||
|
||||
private final ConcurrentHashMap<String, String> otaPackSessions;
|
||||
@ -967,7 +967,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
if (infoNode != null) {
|
||||
JsonNode gatewayNode = infoNode.get("gateway");
|
||||
if (gatewayNode != null && gatewayNode.asBoolean()) {
|
||||
gatewaySessionHandler = new GatewaySessionHandler(deviceSessionCtx, sessionId);
|
||||
gatewaySessionHandler = new AbstractGatewaySessionHandler(deviceSessionCtx, sessionId);
|
||||
if (infoNode.has(DefaultTransportService.OVERWRITE_ACTIVITY_TIME) && infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).isBoolean()) {
|
||||
sessionMetaData.setOverwriteActivityTime(infoNode.get(DefaultTransportService.OVERWRITE_ACTIVITY_TIME).asBoolean());
|
||||
}
|
||||
|
||||
@ -35,12 +35,12 @@ import java.util.concurrent.ConcurrentMap;
|
||||
* Created by ashvayka on 19.01.17.
|
||||
*/
|
||||
@Slf4j
|
||||
public class GatewayDeviceSessionCtx extends MqttDeviceAwareSessionContext implements SessionMsgListener {
|
||||
public class AbstractGatewayDeviceSessionContext extends MqttDeviceAwareSessionContext implements SessionMsgListener {
|
||||
|
||||
private final GatewaySessionHandler parent;
|
||||
private final AbstractGatewaySessionHandler parent;
|
||||
private final TransportService transportService;
|
||||
|
||||
public GatewayDeviceSessionCtx(GatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
|
||||
public AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler parent, TransportDeviceInfo deviceInfo,
|
||||
DeviceProfile deviceProfile, ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap,
|
||||
TransportService transportService) {
|
||||
super(UUID.randomUUID(), mqttQoSMap);
|
||||
@ -76,7 +76,7 @@ import static org.thingsboard.server.common.transport.service.DefaultTransportSe
|
||||
* Created by ashvayka on 19.01.17.
|
||||
*/
|
||||
@Slf4j
|
||||
public class GatewaySessionHandler {
|
||||
public class AbstractGatewaySessionHandler {
|
||||
|
||||
private static final String DEFAULT_DEVICE_TYPE = "default";
|
||||
private static final String CAN_T_PARSE_VALUE = "Can't parse value: ";
|
||||
@ -87,13 +87,13 @@ public class GatewaySessionHandler {
|
||||
private final TransportDeviceInfo gateway;
|
||||
private final UUID sessionId;
|
||||
private final ConcurrentMap<String, Lock> deviceCreationLockMap;
|
||||
private final ConcurrentMap<String, GatewayDeviceSessionCtx> devices;
|
||||
private final ConcurrentMap<String, ListenableFuture<GatewayDeviceSessionCtx>> deviceFutures;
|
||||
private final ConcurrentMap<String, AbstractGatewayDeviceSessionContext> devices;
|
||||
private final ConcurrentMap<String, ListenableFuture<AbstractGatewayDeviceSessionContext>> deviceFutures;
|
||||
private final ConcurrentMap<MqttTopicMatcher, Integer> mqttQoSMap;
|
||||
private final ChannelHandlerContext channel;
|
||||
private final DeviceSessionCtx deviceSessionCtx;
|
||||
|
||||
public GatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
|
||||
public AbstractGatewaySessionHandler(DeviceSessionCtx deviceSessionCtx, UUID sessionId) {
|
||||
this.context = deviceSessionCtx.getContext();
|
||||
this.transportService = context.getTransportService();
|
||||
this.deviceSessionCtx = deviceSessionCtx;
|
||||
@ -195,7 +195,7 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
|
||||
void deregisterSession(String deviceName) {
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = devices.remove(deviceName);
|
||||
AbstractGatewayDeviceSessionContext deviceSessionCtx = devices.remove(deviceName);
|
||||
if (deviceSessionCtx != null) {
|
||||
deregisterSession(deviceName, deviceSessionCtx);
|
||||
} else {
|
||||
@ -217,9 +217,9 @@ public class GatewaySessionHandler {
|
||||
|
||||
private void processOnConnect(MqttPublishMessage msg, String deviceName, String deviceType) {
|
||||
log.trace("[{}] onDeviceConnect: {}", sessionId, deviceName);
|
||||
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
Futures.addCallback(onDeviceConnect(deviceName, deviceType), new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx result) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext result) {
|
||||
ack(msg);
|
||||
log.trace("[{}] onDeviceConnectOk: {}", sessionId, deviceName);
|
||||
}
|
||||
@ -232,8 +232,8 @@ public class GatewaySessionHandler {
|
||||
}, context.getExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> onDeviceConnect(String deviceName, String deviceType) {
|
||||
GatewayDeviceSessionCtx result = devices.get(deviceName);
|
||||
private ListenableFuture<AbstractGatewayDeviceSessionContext> onDeviceConnect(String deviceName, String deviceType) {
|
||||
AbstractGatewayDeviceSessionContext result = devices.get(deviceName);
|
||||
if (result == null) {
|
||||
Lock deviceCreationLock = deviceCreationLockMap.computeIfAbsent(deviceName, s -> new ReentrantLock());
|
||||
deviceCreationLock.lock();
|
||||
@ -252,9 +252,9 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> getDeviceCreationFuture(String deviceName, String deviceType) {
|
||||
final SettableFuture<GatewayDeviceSessionCtx> futureToSet = SettableFuture.create();
|
||||
ListenableFuture<GatewayDeviceSessionCtx> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
|
||||
private ListenableFuture<AbstractGatewayDeviceSessionContext> getDeviceCreationFuture(String deviceName, String deviceType) {
|
||||
final SettableFuture<AbstractGatewayDeviceSessionContext> futureToSet = SettableFuture.create();
|
||||
ListenableFuture<AbstractGatewayDeviceSessionContext> future = deviceFutures.putIfAbsent(deviceName, futureToSet);
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
@ -267,7 +267,7 @@ public class GatewaySessionHandler {
|
||||
new TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse>() {
|
||||
@Override
|
||||
public void onSuccess(GetOrCreateDeviceFromGatewayResponse msg) {
|
||||
GatewayDeviceSessionCtx deviceSessionCtx = new GatewayDeviceSessionCtx(GatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
|
||||
AbstractGatewayDeviceSessionContext deviceSessionCtx = new AbstractGatewayDeviceSessionContext(AbstractGatewaySessionHandler.this, msg.getDeviceInfo(), msg.getDeviceProfile(), mqttQoSMap, transportService);
|
||||
if (devices.putIfAbsent(deviceName, deviceSessionCtx) == null) {
|
||||
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
|
||||
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
|
||||
@ -348,7 +348,7 @@ public class GatewaySessionHandler {
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
if (!deviceEntry.getValue().isJsonArray()) {
|
||||
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
|
||||
}
|
||||
@ -380,9 +380,9 @@ public class GatewaySessionHandler {
|
||||
deviceMsgList.forEach(telemetryMsg -> {
|
||||
String deviceName = checkDeviceName(telemetryMsg.getDeviceName());
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
TransportProtos.PostTelemetryMsg msg = telemetryMsg.getMsg();
|
||||
try {
|
||||
TransportProtos.PostTelemetryMsg postTelemetryMsg = ProtoConverter.validatePostTelemetryMsg(msg.toByteArray());
|
||||
@ -408,7 +408,7 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostTelemetryMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) {
|
||||
private void processPostTelemetryMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostTelemetryMsg postTelemetryMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), postTelemetryMsg, getPubAckCallback(channel, deviceName, msgId, postTelemetryMsg));
|
||||
}
|
||||
|
||||
@ -419,9 +419,9 @@ public class GatewaySessionHandler {
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
|
||||
String deviceName = deviceEntry.getKey();
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
if (!deviceEntry.getValue().isJsonObject()) {
|
||||
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
|
||||
}
|
||||
@ -453,9 +453,9 @@ public class GatewaySessionHandler {
|
||||
claimMsgList.forEach(claimDeviceMsg -> {
|
||||
String deviceName = checkDeviceName(claimDeviceMsg.getDeviceName());
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
TransportApiProtos.ClaimDevice claimRequest = claimDeviceMsg.getClaimRequest();
|
||||
if (claimRequest == null) {
|
||||
throw new IllegalArgumentException("Claim request for device: " + deviceName + " is null!");
|
||||
@ -484,7 +484,7 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void processClaimDeviceMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
|
||||
private void processClaimDeviceMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ClaimDeviceMsg claimDeviceMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), claimDeviceMsg, getPubAckCallback(channel, deviceName, msgId, claimDeviceMsg));
|
||||
}
|
||||
|
||||
@ -495,9 +495,9 @@ public class GatewaySessionHandler {
|
||||
for (Map.Entry<String, JsonElement> deviceEntry : jsonObj.entrySet()) {
|
||||
String deviceName = deviceEntry.getKey();
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
if (!deviceEntry.getValue().isJsonObject()) {
|
||||
throw new JsonSyntaxException(CAN_T_PARSE_VALUE + json);
|
||||
}
|
||||
@ -524,9 +524,9 @@ public class GatewaySessionHandler {
|
||||
attributesMsgList.forEach(attributesMsg -> {
|
||||
String deviceName = checkDeviceName(attributesMsg.getDeviceName());
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg();
|
||||
if (kvListProto == null) {
|
||||
throw new IllegalArgumentException("Attributes List for device: " + deviceName + " is empty!");
|
||||
@ -554,7 +554,7 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void processPostAttributesMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
|
||||
private void processPostAttributesMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.PostAttributeMsg postAttributeMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), postAttributeMsg, getPubAckCallback(channel, deviceName, msgId, postAttributeMsg));
|
||||
}
|
||||
|
||||
@ -603,9 +603,9 @@ public class GatewaySessionHandler {
|
||||
JsonObject jsonObj = json.getAsJsonObject();
|
||||
String deviceName = jsonObj.get(DEVICE_PROPERTY).getAsString();
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
Integer requestId = jsonObj.get("id").getAsInt();
|
||||
String data = jsonObj.get("data").toString();
|
||||
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
|
||||
@ -628,9 +628,9 @@ public class GatewaySessionHandler {
|
||||
TransportApiProtos.GatewayRpcResponseMsg gatewayRpcResponseMsg = TransportApiProtos.GatewayRpcResponseMsg.parseFrom(getBytes(payload));
|
||||
String deviceName = checkDeviceName(gatewayRpcResponseMsg.getDeviceName());
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
Integer requestId = gatewayRpcResponseMsg.getId();
|
||||
String data = gatewayRpcResponseMsg.getData();
|
||||
TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg = TransportProtos.ToDeviceRpcResponseMsg.newBuilder()
|
||||
@ -648,16 +648,16 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void processRpcResponseMsg(GatewayDeviceSessionCtx deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
|
||||
private void processRpcResponseMsg(AbstractGatewayDeviceSessionContext deviceCtx, TransportProtos.ToDeviceRpcResponseMsg rpcResponseMsg, String deviceName, int msgId) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), rpcResponseMsg, getPubAckCallback(channel, deviceName, msgId, rpcResponseMsg));
|
||||
}
|
||||
|
||||
private void processGetAttributeRequestMessage(MqttPublishMessage mqttMsg, String deviceName, TransportProtos.GetAttributeRequestMsg requestMsg) {
|
||||
int msgId = getMsgId(mqttMsg);
|
||||
Futures.addCallback(checkDeviceConnected(deviceName),
|
||||
new FutureCallback<GatewayDeviceSessionCtx>() {
|
||||
new FutureCallback<AbstractGatewayDeviceSessionContext>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable GatewayDeviceSessionCtx deviceCtx) {
|
||||
public void onSuccess(@Nullable AbstractGatewayDeviceSessionContext deviceCtx) {
|
||||
transportService.process(deviceCtx.getSessionInfo(), requestMsg, getPubAckCallback(channel, deviceName, msgId, requestMsg));
|
||||
}
|
||||
|
||||
@ -681,8 +681,8 @@ public class GatewaySessionHandler {
|
||||
return result.build();
|
||||
}
|
||||
|
||||
private ListenableFuture<GatewayDeviceSessionCtx> checkDeviceConnected(String deviceName) {
|
||||
GatewayDeviceSessionCtx ctx = devices.get(deviceName);
|
||||
private ListenableFuture<AbstractGatewayDeviceSessionContext> checkDeviceConnected(String deviceName) {
|
||||
AbstractGatewayDeviceSessionContext ctx = devices.get(deviceName);
|
||||
if (ctx == null) {
|
||||
log.debug("[{}] Missing device [{}] for the gateway session", sessionId, deviceName);
|
||||
return onDeviceConnect(deviceName, DEFAULT_DEVICE_TYPE);
|
||||
@ -723,7 +723,7 @@ public class GatewaySessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) {
|
||||
private void deregisterSession(String deviceName, AbstractGatewayDeviceSessionContext deviceSessionCtx) {
|
||||
transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null);
|
||||
log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName);
|
||||
@ -46,7 +46,7 @@ public class GatewaySessionHandlerTest {
|
||||
|
||||
@Test
|
||||
public void givenConcurrentReferenceHashMap_WhenGC_thenMapIsEmpty() {
|
||||
GatewaySessionHandler gsh = mock(GatewaySessionHandler.class);
|
||||
AbstractGatewaySessionHandler gsh = mock(AbstractGatewaySessionHandler.class);
|
||||
willCallRealMethod().given(gsh).createWeakMap();
|
||||
|
||||
ConcurrentMap<String, Lock> map = gsh.createWeakMap();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user