diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelperTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelperTest.java new file mode 100644 index 0000000000..39d8845dc4 --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelperTest.java @@ -0,0 +1,114 @@ +package org.thingsboard.server.transport.lwm2m.server; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptyList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_TELEMETRY; + +class LwM2mTransportServerHelperTest { + + public static final String KEY_SW_STATE = "sw_state"; + public static final String DOWNLOADING = "DOWNLOADING"; + + long now; + List kvList; + ConcurrentMap keyTsLatestMap; + LwM2mTransportServerHelper helper; + LwM2mTransportContext context; + + + @BeforeEach + void setUp() { + now = System.currentTimeMillis(); + context = mock(LwM2mTransportContext.class); + helper = spy(new LwM2mTransportServerHelper(context)); + willReturn(now).given(helper).getCurrentTimeMillis(); + kvList = List.of( + TransportProtos.KeyValueProto.newBuilder().setKey(KEY_SW_STATE).setStringV(DOWNLOADING).build(), + TransportProtos.KeyValueProto.newBuilder().setKey(LOG_LWM2M_TELEMETRY).setStringV("Transport log example").build() + ); + keyTsLatestMap = new ConcurrentHashMap<>(); + } + + @Test + void givenKeyAndLatestTsMapAndCurrentTs_whenGetTs_thenVerifyNoGetTsByKeyCall() { + assertThat(helper.getTs(null, null)).isEqualTo(now); + assertThat(helper.getTs(null, keyTsLatestMap)).isEqualTo(now); + assertThat(helper.getTs(emptyList(), null)).isEqualTo(now); + assertThat(helper.getTs(emptyList(), keyTsLatestMap)).isEqualTo(now); + assertThat(helper.getTs(kvList, null)).isEqualTo(now); + + verify(helper, never()).getTsByKey(anyString(), any(ConcurrentMap.class), anyLong()); + verify(helper, times(5)).getCurrentTimeMillis(); + } + + @Test + void givenKeyAndLatestTsMapAndCurrentTs_whenGetTs_thenVerifyGetTsByKeyCallByFirstKey() { + assertThat(helper.getTs(kvList, keyTsLatestMap)).isEqualTo(now); + + verify(helper, times(1)).getTsByKey(kvList.get(0).getKey(), keyTsLatestMap, now); + verify(helper, times(1)).getTsByKey(anyString(), any(ConcurrentMap.class), anyLong()); + } + + @Test + void givenKeyAndEmptyLatestTsMap_whenGetTsByKey_thenAddToMapAndReturnNow() { + assertThat(keyTsLatestMap).as("ts latest map before").isEmpty(); + + assertThat(helper.getTsByKey(KEY_SW_STATE, keyTsLatestMap, now)).as("getTsByKey").isEqualTo(now); + + assertThat(keyTsLatestMap).as("ts latest map after").hasSize(1); + assertThat(keyTsLatestMap.get(KEY_SW_STATE)).as("key present").isNotNull(); + assertThat(keyTsLatestMap.get(KEY_SW_STATE).get()).as("ts in map by key").isEqualTo(now); + } + + @Test + void givenKeyAndLatestTsMapWithExistedKey_whenGetTsByKey_thenCallSwapOrIncrementMethod() { + keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong()); + keyTsLatestMap.put("other", new AtomicLong()); + assertThat(keyTsLatestMap).as("ts latest map").hasSize(2); + willReturn(now).given(helper).compareAndSwapOrIncrementTsAtomically(any(AtomicLong.class), anyLong()); + + assertThat(helper.getTsByKey(KEY_SW_STATE, keyTsLatestMap, now)).as("getTsByKey").isEqualTo(now); + + verify(helper, times(1)).compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now); + verify(helper, times(1)).compareAndSwapOrIncrementTsAtomically(any(AtomicLong.class), anyLong()); + } + + @Test + void givenMapWithTsValueLessThanNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnNow() { + keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(now - 1)); + assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(now); + } + + @Test + void givenMapWithTsValueEqualsNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnNowIncremented() { + keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(now)); + assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(now + 1); + } + + @Test + void givenMapWithTsValueGreaterThanNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnGreaterThanNowIncremented() { + final long nextHourTs = now + TimeUnit.HOURS.toMillis(1); + keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(nextHourTs)); + assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(nextHourTs + 1); + } + +} \ No newline at end of file diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java index a50e979b77..f91aea84f7 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/LwM2mTransportServerHelper.java @@ -14,21 +14,6 @@ * limitations under the License. */ package org.thingsboard.server.transport.lwm2m.server; -/** - * Copyright © 2016-2020 The Thingsboard Authors - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -37,10 +22,7 @@ import org.eclipse.leshan.core.model.DefaultDDFFileValidator; import org.eclipse.leshan.core.model.InvalidDDFFileException; import org.eclipse.leshan.core.model.ObjectModel; import org.eclipse.leshan.core.model.ResourceModel; -import org.eclipse.leshan.core.node.LwM2mPath; -import org.eclipse.leshan.core.node.LwM2mResource; import org.eclipse.leshan.core.node.codec.CodecException; -import org.eclipse.leshan.core.request.ContentFormat; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; @@ -49,19 +31,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; -import org.thingsboard.server.transport.lwm2m.server.adaptors.LwM2MJsonAdaptor; -import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; -import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import static org.thingsboard.server.gen.transport.TransportProtos.KeyValueType.BOOLEAN_V; -import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.fromVersionedIdToObjectId; @Slf4j @Component @@ -70,11 +50,6 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f public class LwM2mTransportServerHelper { private final LwM2mTransportContext context; - private final AtomicInteger atomicTs = new AtomicInteger(0); - - public long getTS() { - return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) * 1000L + (atomicTs.getAndIncrement() % 1000); - } public void sendParametersOnThingsboardAttribute(List result, SessionInfoProto sessionInfo) { PostAttributeMsg.Builder request = PostAttributeMsg.newBuilder(); @@ -83,16 +58,67 @@ public class LwM2mTransportServerHelper { context.getTransportService().process(sessionInfo, postAttributeMsg, TransportServiceCallback.EMPTY); } - public void sendParametersOnThingsboardTelemetry(List result, SessionInfoProto sessionInfo) { - PostTelemetryMsg.Builder request = PostTelemetryMsg.newBuilder(); - TransportProtos.TsKvListProto.Builder builder = TransportProtos.TsKvListProto.newBuilder(); - builder.setTs(this.getTS()); - builder.addAllKv(result); - request.addTsKvList(builder.build()); - PostTelemetryMsg postTelemetryMsg = request.build(); + public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo) { + sendParametersOnThingsboardTelemetry(kvList, sessionInfo, null); + } + + public void sendParametersOnThingsboardTelemetry(List kvList, SessionInfoProto sessionInfo, @Nullable ConcurrentMap keyTsLatestMap) { + TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap); + + PostTelemetryMsg postTelemetryMsg = PostTelemetryMsg.newBuilder() + .addTsKvList(tsKvList) + .build(); + context.getTransportService().process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY); } + TransportProtos.TsKvListProto toTsKvList(List kvList, ConcurrentMap keyTsLatestMap) { + return TransportProtos.TsKvListProto.newBuilder() + .setTs(getTs(kvList, keyTsLatestMap)) + .addAllKv(kvList) + .build(); + } + + long getTs(List kvList, ConcurrentMap keyTsLatestMap) { + if (keyTsLatestMap == null || kvList == null || kvList.isEmpty()) { + return getCurrentTimeMillis(); + } + + return getTsByKey(kvList.get(0).getKey(), keyTsLatestMap, getCurrentTimeMillis()); + } + + long getTsByKey(@Nonnull String key, @Nonnull ConcurrentMap keyTsLatestMap, final long tsNow) { + AtomicLong tsLatestAtomic = keyTsLatestMap.putIfAbsent(key, new AtomicLong(tsNow)); + if (tsLatestAtomic == null) { + return tsNow; // it is a first known timestamp for this key. return as the latest + } + + return compareAndSwapOrIncrementTsAtomically(tsLatestAtomic, tsNow); + } + + /** + * This algorithm is sensitive to wall-clock time shift. + * Once time have shifted *backward*, the latest ts never came back. + * Ts latest will be incremented until current time overtake the latest ts. + * In normal environment without race conditions method will return current ts (wall-clock) + * */ + long compareAndSwapOrIncrementTsAtomically(AtomicLong tsLatestAtomic, final long tsNow) { + long tsLatest; + while ((tsLatest = tsLatestAtomic.get()) < tsNow) { + if (tsLatestAtomic.compareAndSet(tsLatest, tsNow)) { + return tsNow; //swap successful + } + } + return tsLatestAtomic.incrementAndGet(); //return next ms + } + + /** + * For the test ability to mock system timer + * */ + long getCurrentTimeMillis() { + return System.currentTimeMillis(); + } + /** * @return - sessionInfo after access connect client */ diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java index 205ac337ad..97a3d72c33 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/client/LwM2mClient.java @@ -50,8 +50,10 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -80,6 +82,8 @@ public class LwM2mClient implements Serializable { private final Map resources; @Getter private final Map sharedAttributes; + @Getter + private final ConcurrentMap keyTsLatestMap; @Getter private TenantId tenantId; @@ -126,6 +130,7 @@ public class LwM2mClient implements Serializable { this.endpoint = endpoint; this.sharedAttributes = new ConcurrentHashMap<>(); this.resources = new ConcurrentHashMap<>(); + this.keyTsLatestMap = new ConcurrentHashMap<>(); this.state = LwM2MClientState.CREATED; this.lock = new ReentrantLock(); this.retryAttempts = new AtomicInteger(0); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java index d84019f5e8..eb5dae8f4a 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/log/DefaultLwM2MTelemetryLogService.java @@ -39,7 +39,7 @@ public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService if (logMsg.length() > 1024) { logMsg = logMsg.substring(0, 1024); } - this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession()); + this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession(), client.getKeyTsLatestMap()); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java index 2972e26fba..2776b64fef 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/DefaultLwM2MOtaUpdateService.java @@ -602,7 +602,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(LOG_LWM2M_TELEMETRY); kvProto.setType(TransportProtos.KeyValueType.STRING_V).setStringV(log); result.add(kvProto.build()); - helper.sendParametersOnThingsboardTelemetry(result, client.getSession()); + helper.sendParametersOnThingsboardTelemetry(result, client.getSession(), client.getKeyTsLatestMap()); } private static Optional toOtaPackageUpdateStatus(FirmwareUpdateResult fwUpdateResult) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/firmware/LwM2MClientFwOtaInfo.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/firmware/LwM2MClientFwOtaInfo.java index 49faa2760d..cfaef5f5f3 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/firmware/LwM2MClientFwOtaInfo.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/firmware/LwM2MClientFwOtaInfo.java @@ -19,12 +19,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo; @Data @EqualsAndHashCode(callSuper = true) @NoArgsConstructor +@ToString(callSuper = true) public class LwM2MClientFwOtaInfo extends LwM2MClientOtaInfo { private Integer deliveryMethod; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/software/LwM2MClientSwOtaInfo.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/software/LwM2MClientSwOtaInfo.java index 76795d922f..547d713251 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/software/LwM2MClientSwOtaInfo.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/ota/software/LwM2MClientSwOtaInfo.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import lombok.ToString; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.ota.OtaPackageType; import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo; @@ -29,6 +30,7 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareU @Data @EqualsAndHashCode(callSuper = true) @NoArgsConstructor +@ToString(callSuper = true) public class LwM2MClientSwOtaInfo extends LwM2MClientOtaInfo { public LwM2MClientSwOtaInfo(String endpoint, String baseUrl, LwM2MSoftwareUpdateStrategy strategy) {