added ability to send lwm2m model updates after sleeping

This commit is contained in:
YevhenBondarenko 2021-12-10 12:42:27 +02:00
parent 6d97d9466f
commit 26034e7047
12 changed files with 595 additions and 68 deletions

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider; import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfigService;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService;
import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager; import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore;
@ -76,6 +77,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
private final LwM2MSessionManager sessionManager; private final LwM2MSessionManager sessionManager;
private final TransportDeviceProfileCache deviceProfileCache; private final TransportDeviceProfileCache deviceProfileCache;
private final LwM2mVersionedModelProvider modelProvider; private final LwM2mVersionedModelProvider modelProvider;
private final LwM2MModelConfigService modelConfigService;
@Autowired @Autowired
@Lazy @Lazy
@ -259,6 +261,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
// TODO: change tests to use new certificate. // TODO: change tests to use new certificate.
// this.securityStore.remove(client.getEndpoint(), registration.getId()); // this.securityStore.remove(client.getEndpoint(), registration.getId());
clientStore.remove(client.getEndpoint()); clientStore.remove(client.getEndpoint());
modelConfigService.removeUpdates(client.getEndpoint());
UUID profileId = client.getProfileId(); UUID profileId = client.getProfileId();
if (profileId != null) { if (profileId != null) {
Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst(); Optional<LwM2mClient> otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst();
@ -333,6 +336,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
if (LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) { if (LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) {
PowerMode powerMode = getPowerMode(lwM2MClient); PowerMode powerMode = getPowerMode(lwM2MClient);
if (PowerMode.PSM.equals(powerMode) || PowerMode.E_DRX.equals(powerMode)) { if (PowerMode.PSM.equals(powerMode) || PowerMode.E_DRX.equals(powerMode)) {
modelConfigService.sendUpdates(lwM2MClient);
defaultLwM2MUplinkMsgHandler.initAttributes(lwM2MClient); defaultLwM2MUplinkMsgHandler.initAttributes(lwM2MClient);
TransportProtos.TransportToDeviceActorMsg persistentRpcRequestMsg = TransportProtos.TransportToDeviceActorMsg TransportProtos.TransportToDeviceActorMsg persistentRpcRequestMsg = TransportProtos.TransportToDeviceActorMsg
.newBuilder() .newBuilder()

View File

@ -19,7 +19,7 @@ public interface DownlinkRequestCallback<R, T> {
default boolean onSent(R request) { default boolean onSent(R request) {
return true; return true;
}; }
void onSuccess(R request, T response); void onSuccess(R request, T response);

View File

@ -0,0 +1,92 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.model;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.thingsboard.common.util.CollectionsUtil.diffSets;
@Data
@NoArgsConstructor
@ToString(exclude = "toCancelRead")
@Slf4j
public class LwM2MModelConfig {
private String endpoint;
private Map<String, ObjectAttributes> attributesToAdd;
private Set<String> attributesToRemove;
private Set<String> toObserve;
private Set<String> toCancelObserve;
private Set<String> toRead;
@JsonIgnore
private Set<String> toCancelRead;
public LwM2MModelConfig(String endpoint) {
this.endpoint = endpoint;
this.attributesToAdd = new ConcurrentHashMap<>();
this.attributesToRemove = ConcurrentHashMap.newKeySet();
this.toObserve = ConcurrentHashMap.newKeySet();
this.toCancelObserve = ConcurrentHashMap.newKeySet();
this.toRead = ConcurrentHashMap.newKeySet();
this.toCancelRead = new HashSet<>();
}
public void merge(LwM2MModelConfig modelConfig) {
if (modelConfig.isEmpty() && modelConfig.getToCancelRead().isEmpty()) {
return;
}
modelConfig.getAttributesToAdd().forEach((k, v) -> {
if (this.attributesToRemove.contains(k)) {
this.attributesToRemove.remove(k);
} else {
this.attributesToAdd.put(k, v);
}
});
modelConfig.getAttributesToRemove().forEach(k -> {
if (this.attributesToAdd.containsKey(k)) {
this.attributesToRemove.remove(k);
} else {
this.attributesToRemove.add(k);
}
});
this.toObserve.addAll(diffSets(this.toCancelObserve, modelConfig.getToObserve()));
this.toCancelObserve.addAll(diffSets(this.toObserve, modelConfig.getToCancelObserve()));
this.toObserve.removeAll(modelConfig.getToCancelObserve());
this.toCancelObserve.removeAll(modelConfig.getToObserve());
this.toRead.removeAll(modelConfig.getToObserve());
this.toRead.removeAll(modelConfig.getToCancelRead());
this.toRead.addAll(modelConfig.getToRead());
}
@JsonIgnore
public boolean isEmpty() {
return attributesToAdd.isEmpty() && toObserve.isEmpty() && toCancelObserve.isEmpty() && toRead.isEmpty();
}
}

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.model;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
public interface LwM2MModelConfigService {
void sendUpdates(LwM2mClient lwM2mClient);
void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig);
void persistUpdates(String endpoint);
void removeUpdates(String endpoint);
}

View File

@ -0,0 +1,235 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.model;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveRequest;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest;
import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Slf4j
@Service
@TbLwM2mTransportComponent
public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService {
@Autowired
private TbLwM2MModelConfigStore modelStore;
@Autowired
@Lazy
private LwM2mDownlinkMsgHandler downlinkMsgHandler;
@Autowired
@Lazy
private LwM2mUplinkMsgHandler uplinkMsgHandler;
@Autowired
@Lazy
private LwM2mClientContext clientContext;
@Autowired
private LwM2MTelemetryLogService logService;
private ConcurrentMap<String, LwM2MModelConfig> currentModelConfigs;
@AfterStartUp(order = Integer.MAX_VALUE - 1)
private void init() {
List<LwM2MModelConfig> models = modelStore.getAll();
log.debug("Fetched model configs: {}", models);
currentModelConfigs = models.stream()
.collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m));
}
@Override
public void sendUpdates(LwM2mClient lwM2mClient) {
LwM2MModelConfig modelConfig = currentModelConfigs.get(lwM2mClient.getEndpoint());
if (modelConfig == null || modelConfig.isEmpty()) {
return;
}
doSend(lwM2mClient, modelConfig);
}
public void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig newModelConfig) {
String endpoint = lwM2mClient.getEndpoint();
LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint);
if (modelConfig == null || modelConfig.isEmpty()) {
modelConfig = newModelConfig;
currentModelConfigs.put(endpoint, modelConfig);
} else {
modelConfig.merge(newModelConfig);
}
if (lwM2mClient.isAsleep()) {
modelStore.put(modelConfig);
} else {
doSend(lwM2mClient, modelConfig);
}
}
private void doSend(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig) {
log.trace("Send LwM2M Model updates: [{}]", modelConfig);
String endpoint = lwM2mClient.getEndpoint();
Map<String, ObjectAttributes> attrToAdd = modelConfig.getAttributesToAdd();
attrToAdd.forEach((id, attributes) -> {
TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id)
.attributes(attributes)
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build();
downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request,
createDownlinkProxyCallback(() -> {
attrToAdd.remove(id);
if (modelConfig.isEmpty()) {
modelStore.remove(endpoint);
}
}, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id))
);
});
Set<String> attrToRemove = modelConfig.getAttributesToRemove();
attrToRemove.forEach((id) -> {
TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id)
.attributes(new ObjectAttributes())
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build();
downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request,
createDownlinkProxyCallback(() -> {
attrToRemove.remove(id);
if (modelConfig.isEmpty()) {
modelStore.remove(endpoint);
}
}, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id))
);
});
Set<String> toRead = modelConfig.getToRead();
toRead.forEach(id -> {
TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(id)
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build();
downlinkMsgHandler.sendReadRequest(lwM2mClient, request,
createDownlinkProxyCallback(() -> {
toRead.remove(id);
if (modelConfig.isEmpty()) {
modelStore.remove(endpoint);
}
}, new TbLwM2MReadCallback(uplinkMsgHandler, logService, lwM2mClient, id))
);
});
Set<String> toObserve = modelConfig.getToObserve();
toObserve.forEach(id -> {
TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(id)
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build();
downlinkMsgHandler.sendObserveRequest(lwM2mClient, request,
createDownlinkProxyCallback(() -> {
toObserve.remove(id);
if (modelConfig.isEmpty()) {
modelStore.remove(endpoint);
}
}, new TbLwM2MObserveCallback(uplinkMsgHandler, logService, lwM2mClient, id))
);
});
Set<String> toCancelObserve = modelConfig.getToCancelObserve();
toCancelObserve.forEach(id -> {
TbLwM2MCancelObserveRequest request = TbLwM2MCancelObserveRequest.builder().versionedId(id)
.timeout(clientContext.getRequestTimeout(lwM2mClient)).build();
downlinkMsgHandler.sendCancelObserveRequest(lwM2mClient, request,
createDownlinkProxyCallback(() -> {
toCancelObserve.remove(id);
if (modelConfig.isEmpty()) {
modelStore.remove(endpoint);
}
}, new TbLwM2MCancelObserveCallback(logService, lwM2mClient, id))
);
});
}
private <R, T> DownlinkRequestCallback<R, T> createDownlinkProxyCallback(Runnable processRemove, DownlinkRequestCallback<R, T> callback) {
return new DownlinkRequestCallback<>() {
@Override
public void onSuccess(R request, T response) {
processRemove.run();
callback.onSuccess(request, response);
}
@Override
public void onValidationError(String params, String msg) {
processRemove.run();
callback.onValidationError(params, msg);
}
@Override
public void onError(String params, Exception e) {
try {
if (e instanceof TimeoutException) {
return;
}
processRemove.run();
} finally {
callback.onError(params, e);
}
}
};
}
@Override
public void persistUpdates(String endpoint) {
LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint);
if (modelConfig != null && !modelConfig.isEmpty()) {
modelStore.put(modelConfig);
}
}
@Override
public void removeUpdates(String endpoint) {
currentModelConfigs.remove(endpoint);
}
@PreDestroy
private void destroy() {
currentModelConfigs.values().forEach(model -> {
if (model != null && !model.isEmpty()) {
modelStore.put(model);
}
});
}
}

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig;
import java.util.Collections;
import java.util.List;
public class TbDummyLwM2MModelConfigStore implements TbLwM2MModelConfigStore {
@Override
public List<LwM2MModelConfig> getAll() {
return Collections.emptyList();
}
@Override
public void put(LwM2MModelConfig modelConfig) {
}
@Override
public void remove(String endpoint) {
}
}

View File

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig;
import java.util.List;
public interface TbLwM2MModelConfigStore {
List<LwM2MModelConfig> getAll();
void put(LwM2MModelConfig modelConfig);
void remove(String endpoint);
}

View File

@ -62,6 +62,11 @@ public class TbLwM2mStoreFactory {
return isRedis() ? new TbRedisLwM2MClientStore(getConnectionFactory()) : new TbDummyLwM2MClientStore(); return isRedis() ? new TbRedisLwM2MClientStore(getConnectionFactory()) : new TbDummyLwM2MClientStore();
} }
@Bean
private TbLwM2MModelConfigStore modelConfigStore() {
return isRedis() ? new TbRedisLwM2MModelConfigStore(getConnectionFactory()) : new TbDummyLwM2MModelConfigStore();
}
@Bean @Bean
private TbLwM2MClientOtaInfoStore otaStore() { private TbLwM2MClientOtaInfoStore otaStore() {
return isRedis() ? new TbLwM2mRedisClientOtaInfoStore(getConnectionFactory()) : new TbDummyLwM2MClientOtaInfoStore(); return isRedis() ? new TbLwM2mRedisClientOtaInfoStore(getConnectionFactory()) : new TbDummyLwM2MClientOtaInfoStore();

View File

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@AllArgsConstructor
public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore {
private static final String MODEL_EP = "MODEL#EP#";
private final RedisConnectionFactory connectionFactory;
@Override
public List<LwM2MModelConfig> getAll() {
try (var connection = connectionFactory.getConnection()) {
List<LwM2MModelConfig> configs = new ArrayList<>();
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build();
List<Cursor<byte[]>> scans = new ArrayList<>();
if (connection instanceof RedisClusterConnection) {
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
});
} else {
scans.add(connection.scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = connection.get(key);
configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class));
});
});
return configs;
}
}
@Override
public void put(LwM2MModelConfig modelConfig) {
byte[] clientSerialized = JacksonUtil.writeValueAsBytes(modelConfig);
try (var connection = connectionFactory.getConnection()) {
connection.getSet(getKey(modelConfig.getEndpoint()), clientSerialized);
}
}
@Override
public void remove(String endpoint) {
try (var connection = connectionFactory.getConnection()) {
connection.del(getKey(endpoint));
}
}
private byte[] getKey(String endpoint) {
return (MODEL_EP + endpoint).getBytes();
}
}

View File

@ -83,6 +83,8 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback;
import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest;
import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig;
import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfigService;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService;
import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager; import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager;
import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore;
@ -106,6 +108,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.thingsboard.common.util.CollectionsUtil.diffSets;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH; import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_ERROR; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_ERROR;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO;
@ -147,6 +150,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
private final LwM2mVersionedModelProvider modelProvider; private final LwM2mVersionedModelProvider modelProvider;
private final RegistrationStore registrationStore; private final RegistrationStore registrationStore;
private final TbLwM2mSecurityStore securityStore; private final TbLwM2mSecurityStore securityStore;
private final LwM2MModelConfigService modelConfigService;
public DefaultLwM2MUplinkMsgHandler(TransportService transportService, public DefaultLwM2MUplinkMsgHandler(TransportService transportService,
LwM2MTransportServerConfig config, LwM2MTransportServerConfig config,
@ -161,7 +165,8 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
TbLwM2MDtlsSessionStore sessionStore, TbLwM2MDtlsSessionStore sessionStore,
LwM2mVersionedModelProvider modelProvider, LwM2mVersionedModelProvider modelProvider,
RegistrationStore registrationStore, RegistrationStore registrationStore,
TbLwM2mSecurityStore securityStore) { TbLwM2mSecurityStore securityStore,
LwM2MModelConfigService modelConfigService) {
this.transportService = transportService; this.transportService = transportService;
this.sessionManager = sessionManager; this.sessionManager = sessionManager;
this.attributesService = attributesService; this.attributesService = attributesService;
@ -176,6 +181,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
this.modelProvider = modelProvider; this.modelProvider = modelProvider;
this.registrationStore = registrationStore; this.registrationStore = registrationStore;
this.securityStore = securityStore; this.securityStore = securityStore;
this.modelConfigService = modelConfigService;
} }
@PostConstruct @PostConstruct
@ -757,7 +763,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
private void onDeviceProfileUpdate(List<LwM2mClient> clients, DeviceProfile deviceProfile) { private void onDeviceProfileUpdate(List<LwM2mClient> clients, DeviceProfile deviceProfile) {
var oldProfile = clientContext.getProfile(deviceProfile.getUuidId()); var oldProfile = clientContext.getProfile(deviceProfile.getUuidId());
if (clientContext.profileUpdate(deviceProfile) != null) { if (clientContext.profileUpdate(deviceProfile) != null) {
// #1
TelemetryMappingConfiguration oldTelemetryParams = oldProfile.getObserveAttr(); TelemetryMappingConfiguration oldTelemetryParams = oldProfile.getObserveAttr();
Set<String> attributeSetOld = oldTelemetryParams.getAttribute(); Set<String> attributeSetOld = oldTelemetryParams.getAttribute();
Set<String> telemetrySetOld = oldTelemetryParams.getTelemetry(); Set<String> telemetrySetOld = oldTelemetryParams.getTelemetry();
@ -777,46 +782,42 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
Set<String> observeToRemove = diffSets(observeNew, observeOld); Set<String> observeToRemove = diffSets(observeNew, observeOld);
Set<String> newObjectsToRead = new HashSet<>(); Set<String> newObjectsToRead = new HashSet<>();
Set<String> newObjectsToCancelRead = new HashSet<>();
// #3.1
if (!attributeSetOld.equals(attributeSetNew)) { if (!attributeSetOld.equals(attributeSetNew)) {
newObjectsToRead.addAll(diffSets(attributeSetOld, attributeSetNew)); newObjectsToRead.addAll(diffSets(attributeSetOld, attributeSetNew));
newObjectsToCancelRead.addAll(diffSets(attributeSetNew, attributeSetOld));
} }
// #3.2
if (!telemetrySetOld.equals(telemetrySetNew)) { if (!telemetrySetOld.equals(telemetrySetNew)) {
newObjectsToRead.addAll(diffSets(telemetrySetOld, telemetrySetNew)); newObjectsToRead.addAll(diffSets(telemetrySetOld, telemetrySetNew));
newObjectsToCancelRead.addAll(diffSets(telemetrySetNew, telemetrySetOld));
} }
// #3.3
if (!keyNameOld.equals(keyNameNew)) { if (!keyNameOld.equals(keyNameNew)) {
ParametersAnalyzeResult keyNameChange = this.getAnalyzerKeyName(keyNameOld, keyNameNew); ParametersAnalyzeResult keyNameChange = this.getAnalyzerKeyName(keyNameOld, keyNameNew);
newObjectsToRead.addAll(keyNameChange.getPathPostParametersAdd()); newObjectsToRead.addAll(keyNameChange.getPathPostParametersAdd());
} }
// #3.4, #6 ParametersAnalyzeResult analyzerParameters = getAttributesAnalyzer(attributeLwm2mOld, attributeLwm2mNew);
if (!attributeLwm2mOld.equals(attributeLwm2mNew)) {
this.compareAndSendWriteAttributes(clients, attributeLwm2mOld, attributeLwm2mNew);
}
// #4.1 add clients.forEach(client -> {
if (!newObjectsToRead.isEmpty()) { LwM2MModelConfig modelConfig = new LwM2MModelConfig(client.getEndpoint());
Set<String> newObjectsToReadButNotNewInObserve = diffSets(observeToAdd, newObjectsToRead); modelConfig.getToRead().addAll(diffSets(observeToAdd, newObjectsToRead));
// update value in Resources modelConfig.getToCancelRead().addAll(newObjectsToCancelRead);
for (String versionedId : newObjectsToReadButNotNewInObserve) { modelConfig.getToCancelObserve().addAll(observeToRemove);
clients.forEach(client -> sendReadRequest(client, versionedId)); modelConfig.getToObserve().addAll(observeToAdd);
}
}
// Calculating difference between old and new flags. Set<String> clientObjects = clientContext.getSupportedIdVerInClient(client);
if (!observeToAdd.isEmpty()) { Set<String> pathToAdd = analyzerParameters.getPathPostParametersAdd().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1]))
for (String targetId : observeToAdd) { .collect(Collectors.toUnmodifiableSet());
clients.forEach(client -> sendObserveRequest(client, targetId)); modelConfig.getAttributesToAdd().putAll(pathToAdd.stream().collect(Collectors.toMap(t -> t, attributeLwm2mNew::get)));
}
} Set<String> pathToRemove = analyzerParameters.getPathPostParametersDel().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1]))
if (!observeToRemove.isEmpty()) { .collect(Collectors.toUnmodifiableSet());
for (String targetId : observeToRemove) { modelConfig.getAttributesToRemove().addAll(pathToRemove);
clients.forEach(client -> sendCancelObserveRequest(targetId, client));
} modelConfigService.sendUpdates(client, modelConfig);
} });
// update value in fwInfo // update value in fwInfo
OtherConfiguration newLwM2mSettings = newProfile.getClientLwM2mSettings(); OtherConfiguration newLwM2mSettings = newProfile.getClientLwM2mSettings();
@ -835,13 +836,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
} }
} }
/**
* Returns new set with elements that are present in set B(new) but absent in set A(old).
*/
private static <T> Set<T> diffSets(Set<T> a, Set<T> b) {
return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet());
}
private ParametersAnalyzeResult getAnalyzerKeyName(Map<String, String> keyNameOld, Map<String, String> keyNameNew) { private ParametersAnalyzeResult getAnalyzerKeyName(Map<String, String> keyNameOld, Map<String, String> keyNameNew) {
ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult(); ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult();
Set<String> paths = keyNameNew.entrySet() Set<String> paths = keyNameNew.entrySet()
@ -852,14 +846,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
return analyzerParameters; return analyzerParameters;
} }
/** private ParametersAnalyzeResult getAttributesAnalyzer(Map<String, ObjectAttributes> attributeLwm2mOld, Map<String, ObjectAttributes> attributeLwm2mNew) {
* #6.1 - send update WriteAttribute
* #6.2 - send empty WriteAttribute
*/
private void compareAndSendWriteAttributes(List<LwM2mClient> clients, Map<String, ObjectAttributes> lwm2mAttributesOld, Map<String, ObjectAttributes> lwm2mAttributesNew) {
ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult(); ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult();
Set<String> pathOld = lwm2mAttributesOld.keySet(); Set<String> pathOld = attributeLwm2mOld.keySet();
Set<String> pathNew = lwm2mAttributesNew.keySet(); Set<String> pathNew = attributeLwm2mNew.keySet();
analyzerParameters.setPathPostParametersAdd(pathNew analyzerParameters.setPathPostParametersAdd(pathNew
.stream().filter(p -> !pathOld.contains(p)).collect(Collectors.toSet())); .stream().filter(p -> !pathOld.contains(p)).collect(Collectors.toSet()));
analyzerParameters.setPathPostParametersDel(pathOld analyzerParameters.setPathPostParametersDel(pathOld
@ -867,31 +857,13 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl
Set<String> pathCommon = pathNew Set<String> pathCommon = pathNew
.stream().filter(pathOld::contains).collect(Collectors.toSet()); .stream().filter(pathOld::contains).collect(Collectors.toSet());
Set<String> pathCommonChange = pathCommon Set<String> pathCommonChange = pathCommon
.stream().filter(p -> !lwm2mAttributesOld.get(p).equals(lwm2mAttributesNew.get(p))).collect(Collectors.toSet()); .stream().filter(p -> !attributeLwm2mOld.get(p).equals(attributeLwm2mNew.get(p))).collect(Collectors.toSet());
analyzerParameters.getPathPostParametersAdd().addAll(pathCommonChange); analyzerParameters.getPathPostParametersAdd().addAll(pathCommonChange);
// #6 return analyzerParameters;
// #6.2
if (analyzerParameters.getPathPostParametersAdd().size() > 0) {
clients.forEach(client -> {
Set<String> clientObjects = clientContext.getSupportedIdVerInClient(client);
Set<String> pathSend = analyzerParameters.getPathPostParametersAdd().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1]))
.collect(Collectors.toUnmodifiableSet());
if (!pathSend.isEmpty()) {
pathSend.forEach(target -> sendWriteAttributesRequest(client, target, lwm2mAttributesNew.get(target)));
}
});
}
// #6.2
if (analyzerParameters.getPathPostParametersDel().size() > 0) {
clients.forEach(client -> {
Set<String> clientObjects = clientContext.getSupportedIdVerInClient(client);
Set<String> pathSend = analyzerParameters.getPathPostParametersDel().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1]))
.collect(Collectors.toUnmodifiableSet());
if (!pathSend.isEmpty()) {
pathSend.forEach(target -> sendWriteAttributesRequest(client, target, new ObjectAttributes()));
}
});
} }
private void compareAndSetWriteAttributes(LwM2mClient client, ParametersAnalyzeResult analyzerParameters, Map<String, ObjectAttributes> lwm2mAttributesNew, LwM2MModelConfig modelConfig) {
} }
/** /**

View File

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
public class CollectionsUtil {
public static boolean isEmpty(Collection<?> collection) {
return collection == null || collection.isEmpty();
}
public static boolean isNotEmpty(Collection<?> collection) {
return !isEmpty(collection);
}
/**
* Returns new set with elements that are present in set B(new) but absent in set A(old).
*/
public static <T> Set<T> diffSets(Set<T> a, Set<T> b) {
return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet());
}
}

View File

@ -123,4 +123,12 @@ public class JacksonUtil {
return OBJECT_MAPPER.treeToValue(tree, type); return OBJECT_MAPPER.treeToValue(tree, type);
} }
public static <T> byte[] writeValueAsBytes(T value) {
try {
return OBJECT_MAPPER.writeValueAsBytes(value);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("The given Json object value: "
+ value + " cannot be transformed to a String", e);
}
}
} }