diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index 018d80f013..6e5ca1669f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -492,12 +492,14 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene boolean hasData = false; for (Object v : value) { Object[] array = (Object[]) v; - dataBuilder.addTs((long) array[0]); + TbSubscriptionUpdateTsValue.Builder tsValueBuilder = TbSubscriptionUpdateTsValue.newBuilder(); + tsValueBuilder.setTs((long) array[0]); String strVal = (String) array[1]; if (strVal != null) { hasData = true; - dataBuilder.addValue(strVal); + tsValueBuilder.setValue(strVal); } + dataBuilder.addTsValue(tsValueBuilder.build()); } if (!ignoreEmptyUpdates || hasData) { builder.addData(dataBuilder.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 978d7aaac3..25b67d3a06 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -42,6 +42,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseP import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -171,10 +172,11 @@ public class TbSubscriptionUtils { Map> data = new TreeMap<>(); proto.getDataList().forEach(v -> { List values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); - for (int i = 0; i < v.getTsCount(); i++) { + for (int i = 0; i < v.getTsValueCount(); i++) { Object[] value = new Object[2]; - value[0] = v.getTs(i); - value[1] = v.getValue(i); + TbSubscriptionUpdateTsValue tsValue = v.getTsValue(i); + value[0] = tsValue.getTs(); + value[1] = tsValue.hasValue() ? tsValue.getValue() : null; values.add(value); } }); diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 5b220974ef..dca92c1c7a 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -14,6 +14,7 @@ * limitations under the License. */ syntax = "proto3"; + package transport; option java_package = "org.thingsboard.server.gen.transport"; @@ -581,8 +582,12 @@ message TbSubscriptionKetStateProto { message TbSubscriptionUpdateValueListProto { string key = 1; - repeated int64 ts = 2; - repeated string value = 3; + repeated TbSubscriptionUpdateTsValue tsValue = 2; +} + +message TbSubscriptionUpdateTsValue { + int64 ts = 1; + optional string value = 2; } /** diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java index 15aeac095c..147e20c02b 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/DefaultLwM2mTransportService.java @@ -17,7 +17,6 @@ package org.thingsboard.server.transport.lwm2m.server; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.eclipse.californium.elements.util.SslContextUtil; import org.eclipse.californium.scandium.config.DtlsConnectorConfig; import org.eclipse.californium.scandium.dtls.cipher.CipherSuite; import org.eclipse.leshan.core.node.codec.DefaultLwM2mNodeDecoder; @@ -29,6 +28,7 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.cache.ota.OtaPackageDataCache; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.transport.config.ssl.SslCredentials; +import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MAuthorizer; @@ -37,10 +37,7 @@ import org.thingsboard.server.transport.lwm2m.server.store.TbSecurityStore; import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2MUplinkMsgHandler; import org.thingsboard.server.transport.lwm2m.utils.LwM2mValueConverterImpl; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.security.PrivateKey; -import java.security.PublicKey; import java.security.cert.X509Certificate; import static org.eclipse.californium.scandium.dtls.cipher.CipherSuite.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256; @@ -71,7 +68,7 @@ public class DefaultLwM2mTransportService implements LwM2MTransportService { private LeshanServer server; - @PostConstruct + @AfterStartUp public void init() { this.server = getLhServer(); /* diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java index da304821da..e2b05847f7 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java @@ -88,7 +88,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { private final Map lwM2mClientsByRegistrationId = new ConcurrentHashMap<>(); private final Map profiles = new ConcurrentHashMap<>(); - @AfterStartUp + @AfterStartUp(order = Integer.MAX_VALUE - 1) public void init() { String nodeId = context.getNodeId(); Set fetchedClients = clientStore.getAll(); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java index 161ca3d2f1..a728a0b6e0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/rpc/DefaultLwM2MRpcRequestHandler.java @@ -87,6 +87,12 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { return; } LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo); + + if (client == null) { + log.warn("Missing client for session: [{}]", sessionInfo); + return; + } + if (client.getRegistration() == null) { this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); return;