Merge TS fix using cherry-pick

This commit is contained in:
Sergey Matvienko 2021-08-13 14:44:46 +03:00 committed by Andrii Shvaika
parent 5834bbd75d
commit 6d20ca441e
7 changed files with 187 additions and 38 deletions

View File

@ -0,0 +1,114 @@
package org.thingsboard.server.transport.lwm2m.server;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptyList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.LOG_LWM2M_TELEMETRY;
class LwM2mTransportServerHelperTest {
public static final String KEY_SW_STATE = "sw_state";
public static final String DOWNLOADING = "DOWNLOADING";
long now;
List<TransportProtos.KeyValueProto> kvList;
ConcurrentMap<String, AtomicLong> keyTsLatestMap;
LwM2mTransportServerHelper helper;
LwM2mTransportContext context;
@BeforeEach
void setUp() {
now = System.currentTimeMillis();
context = mock(LwM2mTransportContext.class);
helper = spy(new LwM2mTransportServerHelper(context));
willReturn(now).given(helper).getCurrentTimeMillis();
kvList = List.of(
TransportProtos.KeyValueProto.newBuilder().setKey(KEY_SW_STATE).setStringV(DOWNLOADING).build(),
TransportProtos.KeyValueProto.newBuilder().setKey(LOG_LWM2M_TELEMETRY).setStringV("Transport log example").build()
);
keyTsLatestMap = new ConcurrentHashMap<>();
}
@Test
void givenKeyAndLatestTsMapAndCurrentTs_whenGetTs_thenVerifyNoGetTsByKeyCall() {
assertThat(helper.getTs(null, null)).isEqualTo(now);
assertThat(helper.getTs(null, keyTsLatestMap)).isEqualTo(now);
assertThat(helper.getTs(emptyList(), null)).isEqualTo(now);
assertThat(helper.getTs(emptyList(), keyTsLatestMap)).isEqualTo(now);
assertThat(helper.getTs(kvList, null)).isEqualTo(now);
verify(helper, never()).getTsByKey(anyString(), any(ConcurrentMap.class), anyLong());
verify(helper, times(5)).getCurrentTimeMillis();
}
@Test
void givenKeyAndLatestTsMapAndCurrentTs_whenGetTs_thenVerifyGetTsByKeyCallByFirstKey() {
assertThat(helper.getTs(kvList, keyTsLatestMap)).isEqualTo(now);
verify(helper, times(1)).getTsByKey(kvList.get(0).getKey(), keyTsLatestMap, now);
verify(helper, times(1)).getTsByKey(anyString(), any(ConcurrentMap.class), anyLong());
}
@Test
void givenKeyAndEmptyLatestTsMap_whenGetTsByKey_thenAddToMapAndReturnNow() {
assertThat(keyTsLatestMap).as("ts latest map before").isEmpty();
assertThat(helper.getTsByKey(KEY_SW_STATE, keyTsLatestMap, now)).as("getTsByKey").isEqualTo(now);
assertThat(keyTsLatestMap).as("ts latest map after").hasSize(1);
assertThat(keyTsLatestMap.get(KEY_SW_STATE)).as("key present").isNotNull();
assertThat(keyTsLatestMap.get(KEY_SW_STATE).get()).as("ts in map by key").isEqualTo(now);
}
@Test
void givenKeyAndLatestTsMapWithExistedKey_whenGetTsByKey_thenCallSwapOrIncrementMethod() {
keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong());
keyTsLatestMap.put("other", new AtomicLong());
assertThat(keyTsLatestMap).as("ts latest map").hasSize(2);
willReturn(now).given(helper).compareAndSwapOrIncrementTsAtomically(any(AtomicLong.class), anyLong());
assertThat(helper.getTsByKey(KEY_SW_STATE, keyTsLatestMap, now)).as("getTsByKey").isEqualTo(now);
verify(helper, times(1)).compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now);
verify(helper, times(1)).compareAndSwapOrIncrementTsAtomically(any(AtomicLong.class), anyLong());
}
@Test
void givenMapWithTsValueLessThanNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnNow() {
keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(now - 1));
assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(now);
}
@Test
void givenMapWithTsValueEqualsNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnNowIncremented() {
keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(now));
assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(now + 1);
}
@Test
void givenMapWithTsValueGreaterThanNow_whenCompareAndSwapOrIncrementTsAtomically_thenReturnGreaterThanNowIncremented() {
final long nextHourTs = now + TimeUnit.HOURS.toMillis(1);
keyTsLatestMap.put(KEY_SW_STATE, new AtomicLong(nextHourTs));
assertThat(helper.compareAndSwapOrIncrementTsAtomically(keyTsLatestMap.get(KEY_SW_STATE), now)).isEqualTo(nextHourTs + 1);
}
}

View File

@ -14,21 +14,6 @@
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server;
/**
* Copyright © 2016-2020 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -37,10 +22,7 @@ import org.eclipse.leshan.core.model.DefaultDDFFileValidator;
import org.eclipse.leshan.core.model.InvalidDDFFileException;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.request.ContentFormat;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
@ -49,19 +31,17 @@ import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.adaptors.LwM2MJsonAdaptor;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.ResourceValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.thingsboard.server.gen.transport.TransportProtos.KeyValueType.BOOLEAN_V;
import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.fromVersionedIdToObjectId;
@Slf4j
@Component
@ -70,11 +50,6 @@ import static org.thingsboard.server.transport.lwm2m.server.LwM2mTransportUtil.f
public class LwM2mTransportServerHelper {
private final LwM2mTransportContext context;
private final AtomicInteger atomicTs = new AtomicInteger(0);
public long getTS() {
return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) * 1000L + (atomicTs.getAndIncrement() % 1000);
}
public void sendParametersOnThingsboardAttribute(List<TransportProtos.KeyValueProto> result, SessionInfoProto sessionInfo) {
PostAttributeMsg.Builder request = PostAttributeMsg.newBuilder();
@ -83,16 +58,67 @@ public class LwM2mTransportServerHelper {
context.getTransportService().process(sessionInfo, postAttributeMsg, TransportServiceCallback.EMPTY);
}
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> result, SessionInfoProto sessionInfo) {
PostTelemetryMsg.Builder request = PostTelemetryMsg.newBuilder();
TransportProtos.TsKvListProto.Builder builder = TransportProtos.TsKvListProto.newBuilder();
builder.setTs(this.getTS());
builder.addAllKv(result);
request.addTsKvList(builder.build());
PostTelemetryMsg postTelemetryMsg = request.build();
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo) {
sendParametersOnThingsboardTelemetry(kvList, sessionInfo, null);
}
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo, @Nullable ConcurrentMap<String, AtomicLong> keyTsLatestMap) {
TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap);
PostTelemetryMsg postTelemetryMsg = PostTelemetryMsg.newBuilder()
.addTsKvList(tsKvList)
.build();
context.getTransportService().process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY);
}
TransportProtos.TsKvListProto toTsKvList(List<TransportProtos.KeyValueProto> kvList, ConcurrentMap<String, AtomicLong> keyTsLatestMap) {
return TransportProtos.TsKvListProto.newBuilder()
.setTs(getTs(kvList, keyTsLatestMap))
.addAllKv(kvList)
.build();
}
long getTs(List<TransportProtos.KeyValueProto> kvList, ConcurrentMap<String, AtomicLong> keyTsLatestMap) {
if (keyTsLatestMap == null || kvList == null || kvList.isEmpty()) {
return getCurrentTimeMillis();
}
return getTsByKey(kvList.get(0).getKey(), keyTsLatestMap, getCurrentTimeMillis());
}
long getTsByKey(@Nonnull String key, @Nonnull ConcurrentMap<String, AtomicLong> keyTsLatestMap, final long tsNow) {
AtomicLong tsLatestAtomic = keyTsLatestMap.putIfAbsent(key, new AtomicLong(tsNow));
if (tsLatestAtomic == null) {
return tsNow; // it is a first known timestamp for this key. return as the latest
}
return compareAndSwapOrIncrementTsAtomically(tsLatestAtomic, tsNow);
}
/**
* This algorithm is sensitive to wall-clock time shift.
* Once time have shifted *backward*, the latest ts never came back.
* Ts latest will be incremented until current time overtake the latest ts.
* In normal environment without race conditions method will return current ts (wall-clock)
* */
long compareAndSwapOrIncrementTsAtomically(AtomicLong tsLatestAtomic, final long tsNow) {
long tsLatest;
while ((tsLatest = tsLatestAtomic.get()) < tsNow) {
if (tsLatestAtomic.compareAndSet(tsLatest, tsNow)) {
return tsNow; //swap successful
}
}
return tsLatestAtomic.incrementAndGet(); //return next ms
}
/**
* For the test ability to mock system timer
* */
long getCurrentTimeMillis() {
return System.currentTimeMillis();
}
/**
* @return - sessionInfo after access connect client
*/

View File

@ -50,8 +50,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -80,6 +82,8 @@ public class LwM2mClient implements Serializable {
private final Map<String, ResourceValue> resources;
@Getter
private final Map<String, TsKvProto> sharedAttributes;
@Getter
private final ConcurrentMap<String, AtomicLong> keyTsLatestMap;
@Getter
private TenantId tenantId;
@ -126,6 +130,7 @@ public class LwM2mClient implements Serializable {
this.endpoint = endpoint;
this.sharedAttributes = new ConcurrentHashMap<>();
this.resources = new ConcurrentHashMap<>();
this.keyTsLatestMap = new ConcurrentHashMap<>();
this.state = LwM2MClientState.CREATED;
this.lock = new ReentrantLock();
this.retryAttempts = new AtomicInteger(0);

View File

@ -39,7 +39,7 @@ public class DefaultLwM2MTelemetryLogService implements LwM2MTelemetryLogService
if (logMsg.length() > 1024) {
logMsg = logMsg.substring(0, 1024);
}
this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession());
this.helper.sendParametersOnThingsboardTelemetry(this.helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), client.getSession(), client.getKeyTsLatestMap());
}
}

View File

@ -602,7 +602,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
kvProto = TransportProtos.KeyValueProto.newBuilder().setKey(LOG_LWM2M_TELEMETRY);
kvProto.setType(TransportProtos.KeyValueType.STRING_V).setStringV(log);
result.add(kvProto.build());
helper.sendParametersOnThingsboardTelemetry(result, client.getSession());
helper.sendParametersOnThingsboardTelemetry(result, client.getSession(), client.getKeyTsLatestMap());
}
private static Optional<OtaPackageUpdateStatus> toOtaPackageUpdateStatus(FirmwareUpdateResult fwUpdateResult) {

View File

@ -19,12 +19,14 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@ToString(callSuper = true)
public class LwM2MClientFwOtaInfo extends LwM2MClientOtaInfo<LwM2MFirmwareUpdateStrategy, FirmwareUpdateState, FirmwareUpdateResult> {
private Integer deliveryMethod;

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.ota.OtaPackageType;
import org.thingsboard.server.transport.lwm2m.server.ota.LwM2MClientOtaInfo;
@ -29,6 +30,7 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.LwM2MFirmwareU
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@ToString(callSuper = true)
public class LwM2MClientSwOtaInfo extends LwM2MClientOtaInfo<LwM2MSoftwareUpdateStrategy, SoftwareUpdateState, SoftwareUpdateResult> {
public LwM2MClientSwOtaInfo(String endpoint, String baseUrl, LwM2MSoftwareUpdateStrategy strategy) {