diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java index e2b05847f7..246444ddaa 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClientContextImpl.java @@ -41,6 +41,7 @@ import org.thingsboard.server.transport.lwm2m.secure.TbLwM2MSecurityInfo; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportContext; import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil; import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider; +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfigService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MClientStore; @@ -76,6 +77,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { private final LwM2MSessionManager sessionManager; private final TransportDeviceProfileCache deviceProfileCache; private final LwM2mVersionedModelProvider modelProvider; + private final LwM2MModelConfigService modelConfigService; @Autowired @Lazy @@ -259,6 +261,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { // TODO: change tests to use new certificate. // this.securityStore.remove(client.getEndpoint(), registration.getId()); clientStore.remove(client.getEndpoint()); + modelConfigService.removeUpdates(client.getEndpoint()); UUID profileId = client.getProfileId(); if (profileId != null) { Optional otherClients = lwM2mClientsByRegistrationId.values().stream().filter(e -> e.getProfileId().equals(profileId)).findFirst(); @@ -333,6 +336,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext { if (LwM2MClientState.REGISTERED.equals(lwM2MClient.getState())) { PowerMode powerMode = getPowerMode(lwM2MClient); if (PowerMode.PSM.equals(powerMode) || PowerMode.E_DRX.equals(powerMode)) { + modelConfigService.sendUpdates(lwM2MClient); defaultLwM2MUplinkMsgHandler.initAttributes(lwM2MClient); TransportProtos.TransportToDeviceActorMsg persistentRpcRequestMsg = TransportProtos.TransportToDeviceActorMsg .newBuilder() diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java index 8912c51960..40cf896f55 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/downlink/DownlinkRequestCallback.java @@ -17,9 +17,9 @@ package org.thingsboard.server.transport.lwm2m.server.downlink; public interface DownlinkRequestCallback { - default boolean onSent(R request){ + default boolean onSent(R request) { return true; - }; + } void onSuccess(R request, T response); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfig.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfig.java new file mode 100644 index 0000000000..5c0acc150c --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfig.java @@ -0,0 +1,92 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.model; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.thingsboard.common.util.CollectionsUtil.diffSets; + +@Data +@NoArgsConstructor +@ToString(exclude = "toCancelRead") +@Slf4j +public class LwM2MModelConfig { + private String endpoint; + private Map attributesToAdd; + private Set attributesToRemove; + private Set toObserve; + private Set toCancelObserve; + private Set toRead; + @JsonIgnore + private Set toCancelRead; + + public LwM2MModelConfig(String endpoint) { + this.endpoint = endpoint; + this.attributesToAdd = new ConcurrentHashMap<>(); + this.attributesToRemove = ConcurrentHashMap.newKeySet(); + this.toObserve = ConcurrentHashMap.newKeySet(); + this.toCancelObserve = ConcurrentHashMap.newKeySet(); + this.toRead = ConcurrentHashMap.newKeySet(); + this.toCancelRead = new HashSet<>(); + } + + public void merge(LwM2MModelConfig modelConfig) { + if (modelConfig.isEmpty() && modelConfig.getToCancelRead().isEmpty()) { + return; + } + + modelConfig.getAttributesToAdd().forEach((k, v) -> { + if (this.attributesToRemove.contains(k)) { + this.attributesToRemove.remove(k); + } else { + this.attributesToAdd.put(k, v); + } + }); + + modelConfig.getAttributesToRemove().forEach(k -> { + if (this.attributesToAdd.containsKey(k)) { + this.attributesToRemove.remove(k); + } else { + this.attributesToRemove.add(k); + } + }); + + this.toObserve.addAll(diffSets(this.toCancelObserve, modelConfig.getToObserve())); + this.toCancelObserve.addAll(diffSets(this.toObserve, modelConfig.getToCancelObserve())); + + this.toObserve.removeAll(modelConfig.getToCancelObserve()); + this.toCancelObserve.removeAll(modelConfig.getToObserve()); + + this.toRead.removeAll(modelConfig.getToObserve()); + this.toRead.removeAll(modelConfig.getToCancelRead()); + this.toRead.addAll(modelConfig.getToRead()); + } + + @JsonIgnore + public boolean isEmpty() { + return attributesToAdd.isEmpty() && toObserve.isEmpty() && toCancelObserve.isEmpty() && toRead.isEmpty(); + } +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigService.java new file mode 100644 index 0000000000..9670b7791b --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigService.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.model; + +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; + +public interface LwM2MModelConfigService { + + void sendUpdates(LwM2mClient lwM2mClient); + + void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig); + + void persistUpdates(String endpoint); + + void removeUpdates(String endpoint); +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java new file mode 100644 index 0000000000..9f0a84cbce --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/model/LwM2MModelConfigServiceImpl.java @@ -0,0 +1,235 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.model; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.device.data.lwm2m.ObjectAttributes; +import org.thingsboard.server.queue.util.AfterStartUp; +import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext; +import org.thingsboard.server.transport.lwm2m.server.downlink.DownlinkRequestCallback; +import org.thingsboard.server.transport.lwm2m.server.downlink.LwM2mDownlinkMsgHandler; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveCallback; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MCancelObserveRequest; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveCallback; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MObserveRequest; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadCallback; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback; +import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest; +import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; +import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MModelConfigStore; +import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; + +import javax.annotation.PreDestroy; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +@Slf4j +@Service +@TbLwM2mTransportComponent +public class LwM2MModelConfigServiceImpl implements LwM2MModelConfigService { + + @Autowired + private TbLwM2MModelConfigStore modelStore; + + @Autowired + @Lazy + private LwM2mDownlinkMsgHandler downlinkMsgHandler; + @Autowired + @Lazy + private LwM2mUplinkMsgHandler uplinkMsgHandler; + @Autowired + @Lazy + private LwM2mClientContext clientContext; + + @Autowired + private LwM2MTelemetryLogService logService; + + private ConcurrentMap currentModelConfigs; + + @AfterStartUp(order = Integer.MAX_VALUE - 1) + private void init() { + List models = modelStore.getAll(); + log.debug("Fetched model configs: {}", models); + currentModelConfigs = models.stream() + .collect(Collectors.toConcurrentMap(LwM2MModelConfig::getEndpoint, m -> m)); + } + + @Override + public void sendUpdates(LwM2mClient lwM2mClient) { + LwM2MModelConfig modelConfig = currentModelConfigs.get(lwM2mClient.getEndpoint()); + if (modelConfig == null || modelConfig.isEmpty()) { + return; + } + + doSend(lwM2mClient, modelConfig); + } + + public void sendUpdates(LwM2mClient lwM2mClient, LwM2MModelConfig newModelConfig) { + String endpoint = lwM2mClient.getEndpoint(); + LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint); + if (modelConfig == null || modelConfig.isEmpty()) { + modelConfig = newModelConfig; + currentModelConfigs.put(endpoint, modelConfig); + } else { + modelConfig.merge(newModelConfig); + } + + if (lwM2mClient.isAsleep()) { + modelStore.put(modelConfig); + } else { + doSend(lwM2mClient, modelConfig); + } + } + + private void doSend(LwM2mClient lwM2mClient, LwM2MModelConfig modelConfig) { + log.trace("Send LwM2M Model updates: [{}]", modelConfig); + + String endpoint = lwM2mClient.getEndpoint(); + + Map attrToAdd = modelConfig.getAttributesToAdd(); + attrToAdd.forEach((id, attributes) -> { + TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id) + .attributes(attributes) + .timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); + downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request, + createDownlinkProxyCallback(() -> { + attrToAdd.remove(id); + if (modelConfig.isEmpty()) { + modelStore.remove(endpoint); + } + }, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id)) + ); + }); + + Set attrToRemove = modelConfig.getAttributesToRemove(); + attrToRemove.forEach((id) -> { + TbLwM2MWriteAttributesRequest request = TbLwM2MWriteAttributesRequest.builder().versionedId(id) + .attributes(new ObjectAttributes()) + .timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); + downlinkMsgHandler.sendWriteAttributesRequest(lwM2mClient, request, + createDownlinkProxyCallback(() -> { + attrToRemove.remove(id); + if (modelConfig.isEmpty()) { + modelStore.remove(endpoint); + } + }, new TbLwM2MWriteAttributesCallback(logService, lwM2mClient, id)) + ); + }); + + Set toRead = modelConfig.getToRead(); + toRead.forEach(id -> { + TbLwM2MReadRequest request = TbLwM2MReadRequest.builder().versionedId(id) + .timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); + downlinkMsgHandler.sendReadRequest(lwM2mClient, request, + createDownlinkProxyCallback(() -> { + toRead.remove(id); + if (modelConfig.isEmpty()) { + modelStore.remove(endpoint); + } + }, new TbLwM2MReadCallback(uplinkMsgHandler, logService, lwM2mClient, id)) + ); + }); + + Set toObserve = modelConfig.getToObserve(); + toObserve.forEach(id -> { + TbLwM2MObserveRequest request = TbLwM2MObserveRequest.builder().versionedId(id) + .timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); + downlinkMsgHandler.sendObserveRequest(lwM2mClient, request, + createDownlinkProxyCallback(() -> { + toObserve.remove(id); + if (modelConfig.isEmpty()) { + modelStore.remove(endpoint); + } + }, new TbLwM2MObserveCallback(uplinkMsgHandler, logService, lwM2mClient, id)) + ); + }); + + Set toCancelObserve = modelConfig.getToCancelObserve(); + toCancelObserve.forEach(id -> { + TbLwM2MCancelObserveRequest request = TbLwM2MCancelObserveRequest.builder().versionedId(id) + .timeout(clientContext.getRequestTimeout(lwM2mClient)).build(); + downlinkMsgHandler.sendCancelObserveRequest(lwM2mClient, request, + createDownlinkProxyCallback(() -> { + toCancelObserve.remove(id); + if (modelConfig.isEmpty()) { + modelStore.remove(endpoint); + } + }, new TbLwM2MCancelObserveCallback(logService, lwM2mClient, id)) + ); + }); + } + + private DownlinkRequestCallback createDownlinkProxyCallback(Runnable processRemove, DownlinkRequestCallback callback) { + return new DownlinkRequestCallback<>() { + @Override + public void onSuccess(R request, T response) { + processRemove.run(); + callback.onSuccess(request, response); + } + + @Override + public void onValidationError(String params, String msg) { + processRemove.run(); + callback.onValidationError(params, msg); + } + + @Override + public void onError(String params, Exception e) { + try { + if (e instanceof TimeoutException) { + return; + } + processRemove.run(); + } finally { + callback.onError(params, e); + } + } + + }; + } + + @Override + public void persistUpdates(String endpoint) { + LwM2MModelConfig modelConfig = currentModelConfigs.get(endpoint); + if (modelConfig != null && !modelConfig.isEmpty()) { + modelStore.put(modelConfig); + } + } + + @Override + public void removeUpdates(String endpoint) { + currentModelConfigs.remove(endpoint); + } + + @PreDestroy + private void destroy() { + currentModelConfigs.values().forEach(model -> { + if (model != null && !model.isEmpty()) { + modelStore.put(model); + } + }); + } +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MModelConfigStore.java new file mode 100644 index 0000000000..06e55ca629 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbDummyLwM2MModelConfigStore.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; + +import java.util.Collections; +import java.util.List; + +public class TbDummyLwM2MModelConfigStore implements TbLwM2MModelConfigStore { + @Override + public List getAll() { + return Collections.emptyList(); + } + + @Override + public void put(LwM2MModelConfig modelConfig) { + + } + + @Override + public void remove(String endpoint) { + + } +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MModelConfigStore.java new file mode 100644 index 0000000000..74d7566279 --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2MModelConfigStore.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; + +import java.util.List; + +public interface TbLwM2MModelConfigStore { + List getAll(); + + void put(LwM2MModelConfig modelConfig); + + void remove(String endpoint); +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java index 9156d73181..de2f0c8de0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mStoreFactory.java @@ -62,6 +62,11 @@ public class TbLwM2mStoreFactory { return isRedis() ? new TbRedisLwM2MClientStore(getConnectionFactory()) : new TbDummyLwM2MClientStore(); } + @Bean + private TbLwM2MModelConfigStore modelConfigStore() { + return isRedis() ? new TbRedisLwM2MModelConfigStore(getConnectionFactory()) : new TbDummyLwM2MModelConfigStore(); + } + @Bean private TbLwM2MClientOtaInfoStore otaStore() { return isRedis() ? new TbLwM2mRedisClientOtaInfoStore(getConnectionFactory()) : new TbDummyLwM2MClientOtaInfoStore(); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java new file mode 100644 index 0000000000..9cf016f7ba --- /dev/null +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@AllArgsConstructor +public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { + private static final String MODEL_EP = "MODEL#EP#"; + private final RedisConnectionFactory connectionFactory; + + @Override + public List getAll() { + try (var connection = connectionFactory.getConnection()) { + List configs = new ArrayList<>(); + ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build(); + List> scans = new ArrayList<>(); + if (connection instanceof RedisClusterConnection) { + ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { + scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); + }); + } else { + scans.add(connection.scan(scanOptions)); + } + + scans.forEach(scan -> { + scan.forEachRemaining(key -> { + byte[] element = connection.get(key); + configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); + }); + }); + return configs; + } + } + + @Override + public void put(LwM2MModelConfig modelConfig) { + byte[] clientSerialized = JacksonUtil.writeValueAsBytes(modelConfig); + try (var connection = connectionFactory.getConnection()) { + connection.getSet(getKey(modelConfig.getEndpoint()), clientSerialized); + } + } + + @Override + public void remove(String endpoint) { + try (var connection = connectionFactory.getConnection()) { + connection.del(getKey(endpoint)); + } + } + + private byte[] getKey(String endpoint) { + return (MODEL_EP + endpoint).getBytes(); + } + +} diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java index 7a2d27d548..3b1aeeb5b1 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/uplink/DefaultLwM2MUplinkMsgHandler.java @@ -83,6 +83,8 @@ import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MReadRequest import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesCallback; import org.thingsboard.server.transport.lwm2m.server.downlink.TbLwM2MWriteAttributesRequest; import org.thingsboard.server.transport.lwm2m.server.log.LwM2MTelemetryLogService; +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfig; +import org.thingsboard.server.transport.lwm2m.server.model.LwM2MModelConfigService; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MOtaUpdateService; import org.thingsboard.server.transport.lwm2m.server.session.LwM2MSessionManager; import org.thingsboard.server.transport.lwm2m.server.store.TbLwM2MDtlsSessionStore; @@ -106,6 +108,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.thingsboard.common.util.CollectionsUtil.diffSets; import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_ERROR; import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_INFO; @@ -147,6 +150,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl private final LwM2mVersionedModelProvider modelProvider; private final RegistrationStore registrationStore; private final TbLwM2mSecurityStore securityStore; + private final LwM2MModelConfigService modelConfigService; public DefaultLwM2MUplinkMsgHandler(TransportService transportService, LwM2MTransportServerConfig config, @@ -161,7 +165,8 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl TbLwM2MDtlsSessionStore sessionStore, LwM2mVersionedModelProvider modelProvider, RegistrationStore registrationStore, - TbLwM2mSecurityStore securityStore) { + TbLwM2mSecurityStore securityStore, + LwM2MModelConfigService modelConfigService) { this.transportService = transportService; this.sessionManager = sessionManager; this.attributesService = attributesService; @@ -176,6 +181,7 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl this.modelProvider = modelProvider; this.registrationStore = registrationStore; this.securityStore = securityStore; + this.modelConfigService = modelConfigService; } @PostConstruct @@ -757,7 +763,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl private void onDeviceProfileUpdate(List clients, DeviceProfile deviceProfile) { var oldProfile = clientContext.getProfile(deviceProfile.getUuidId()); if (clientContext.profileUpdate(deviceProfile) != null) { - // #1 TelemetryMappingConfiguration oldTelemetryParams = oldProfile.getObserveAttr(); Set attributeSetOld = oldTelemetryParams.getAttribute(); Set telemetrySetOld = oldTelemetryParams.getTelemetry(); @@ -777,46 +782,42 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl Set observeToRemove = diffSets(observeNew, observeOld); Set newObjectsToRead = new HashSet<>(); + Set newObjectsToCancelRead = new HashSet<>(); - // #3.1 if (!attributeSetOld.equals(attributeSetNew)) { newObjectsToRead.addAll(diffSets(attributeSetOld, attributeSetNew)); + newObjectsToCancelRead.addAll(diffSets(attributeSetNew, attributeSetOld)); + } - // #3.2 if (!telemetrySetOld.equals(telemetrySetNew)) { newObjectsToRead.addAll(diffSets(telemetrySetOld, telemetrySetNew)); + newObjectsToCancelRead.addAll(diffSets(telemetrySetNew, telemetrySetOld)); } - // #3.3 if (!keyNameOld.equals(keyNameNew)) { ParametersAnalyzeResult keyNameChange = this.getAnalyzerKeyName(keyNameOld, keyNameNew); newObjectsToRead.addAll(keyNameChange.getPathPostParametersAdd()); } - // #3.4, #6 - if (!attributeLwm2mOld.equals(attributeLwm2mNew)) { - this.compareAndSendWriteAttributes(clients, attributeLwm2mOld, attributeLwm2mNew); - } + ParametersAnalyzeResult analyzerParameters = getAttributesAnalyzer(attributeLwm2mOld, attributeLwm2mNew); - // #4.1 add - if (!newObjectsToRead.isEmpty()) { - Set newObjectsToReadButNotNewInObserve = diffSets(observeToAdd, newObjectsToRead); - // update value in Resources - for (String versionedId : newObjectsToReadButNotNewInObserve) { - clients.forEach(client -> sendReadRequest(client, versionedId)); - } - } + clients.forEach(client -> { + LwM2MModelConfig modelConfig = new LwM2MModelConfig(client.getEndpoint()); + modelConfig.getToRead().addAll(diffSets(observeToAdd, newObjectsToRead)); + modelConfig.getToCancelRead().addAll(newObjectsToCancelRead); + modelConfig.getToCancelObserve().addAll(observeToRemove); + modelConfig.getToObserve().addAll(observeToAdd); - // Calculating difference between old and new flags. - if (!observeToAdd.isEmpty()) { - for (String targetId : observeToAdd) { - clients.forEach(client -> sendObserveRequest(client, targetId)); - } - } - if (!observeToRemove.isEmpty()) { - for (String targetId : observeToRemove) { - clients.forEach(client -> sendCancelObserveRequest(targetId, client)); - } - } + Set clientObjects = clientContext.getSupportedIdVerInClient(client); + Set pathToAdd = analyzerParameters.getPathPostParametersAdd().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1])) + .collect(Collectors.toUnmodifiableSet()); + modelConfig.getAttributesToAdd().putAll(pathToAdd.stream().collect(Collectors.toMap(t -> t, attributeLwm2mNew::get))); + + Set pathToRemove = analyzerParameters.getPathPostParametersDel().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1])) + .collect(Collectors.toUnmodifiableSet()); + modelConfig.getAttributesToRemove().addAll(pathToRemove); + + modelConfigService.sendUpdates(client, modelConfig); + }); // update value in fwInfo OtherConfiguration newLwM2mSettings = newProfile.getClientLwM2mSettings(); @@ -835,13 +836,6 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl } } - /** - * Returns new set with elements that are present in set B(new) but absent in set A(old). - */ - private static Set diffSets(Set a, Set b) { - return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet()); - } - private ParametersAnalyzeResult getAnalyzerKeyName(Map keyNameOld, Map keyNameNew) { ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult(); Set paths = keyNameNew.entrySet() @@ -852,14 +846,10 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl return analyzerParameters; } - /** - * #6.1 - send update WriteAttribute - * #6.2 - send empty WriteAttribute - */ - private void compareAndSendWriteAttributes(List clients, Map lwm2mAttributesOld, Map lwm2mAttributesNew) { + private ParametersAnalyzeResult getAttributesAnalyzer(Map attributeLwm2mOld, Map attributeLwm2mNew) { ParametersAnalyzeResult analyzerParameters = new ParametersAnalyzeResult(); - Set pathOld = lwm2mAttributesOld.keySet(); - Set pathNew = lwm2mAttributesNew.keySet(); + Set pathOld = attributeLwm2mOld.keySet(); + Set pathNew = attributeLwm2mNew.keySet(); analyzerParameters.setPathPostParametersAdd(pathNew .stream().filter(p -> !pathOld.contains(p)).collect(Collectors.toSet())); analyzerParameters.setPathPostParametersDel(pathOld @@ -867,31 +857,13 @@ public class DefaultLwM2MUplinkMsgHandler extends LwM2MExecutorAwareService impl Set pathCommon = pathNew .stream().filter(pathOld::contains).collect(Collectors.toSet()); Set pathCommonChange = pathCommon - .stream().filter(p -> !lwm2mAttributesOld.get(p).equals(lwm2mAttributesNew.get(p))).collect(Collectors.toSet()); + .stream().filter(p -> !attributeLwm2mOld.get(p).equals(attributeLwm2mNew.get(p))).collect(Collectors.toSet()); analyzerParameters.getPathPostParametersAdd().addAll(pathCommonChange); - // #6 - // #6.2 - if (analyzerParameters.getPathPostParametersAdd().size() > 0) { - clients.forEach(client -> { - Set clientObjects = clientContext.getSupportedIdVerInClient(client); - Set pathSend = analyzerParameters.getPathPostParametersAdd().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1])) - .collect(Collectors.toUnmodifiableSet()); - if (!pathSend.isEmpty()) { - pathSend.forEach(target -> sendWriteAttributesRequest(client, target, lwm2mAttributesNew.get(target))); - } - }); - } - // #6.2 - if (analyzerParameters.getPathPostParametersDel().size() > 0) { - clients.forEach(client -> { - Set clientObjects = clientContext.getSupportedIdVerInClient(client); - Set pathSend = analyzerParameters.getPathPostParametersDel().stream().filter(target -> clientObjects.contains("/" + target.split(LWM2M_SEPARATOR_PATH)[1])) - .collect(Collectors.toUnmodifiableSet()); - if (!pathSend.isEmpty()) { - pathSend.forEach(target -> sendWriteAttributesRequest(client, target, new ObjectAttributes())); - } - }); - } + return analyzerParameters; + } + + private void compareAndSetWriteAttributes(LwM2mClient client, ParametersAnalyzeResult analyzerParameters, Map lwm2mAttributesNew, LwM2MModelConfig modelConfig) { + } /** diff --git a/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java new file mode 100644 index 0000000000..8a2228e554 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/CollectionsUtil.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +public class CollectionsUtil { + public static boolean isEmpty(Collection collection) { + return collection == null || collection.isEmpty(); + } + + public static boolean isNotEmpty(Collection collection) { + return !isEmpty(collection); + } + + /** + * Returns new set with elements that are present in set B(new) but absent in set A(old). + */ + public static Set diffSets(Set a, Set b) { + return b.stream().filter(p -> !a.contains(p)).collect(Collectors.toSet()); + } +} diff --git a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java index da50963bd4..6e8e5610eb 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java +++ b/common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java @@ -123,4 +123,12 @@ public class JacksonUtil { return OBJECT_MAPPER.treeToValue(tree, type); } + public static byte[] writeValueAsBytes(T value) { + try { + return OBJECT_MAPPER.writeValueAsBytes(value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("The given Json object value: " + + value + " cannot be transformed to a String", e); + } + } }