Merge branch 'rc' into fix/3912-device-table

This commit is contained in:
Max Petrov 2024-09-30 16:34:16 +03:00 committed by GitHub
commit 4f11a320c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 546 additions and 219 deletions

View File

@ -37,7 +37,7 @@
},
{
"tag": "icon",
"stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n} else {\n element.hide()\n}",
"stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nvar showLabel = ctx.properties.label;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n if (!showLabel) {\n element.transform({translateX: 83,translateY: 137});\n }\n} else {\n element.hide()\n}\n",
"actions": null
},
{

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -37,7 +37,7 @@
},
{
"tag": "icon",
"stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n} else {\n element.hide()\n}",
"stateRenderFunction": "var showIcon = ctx.properties.showIcon;\nvar showLabel = ctx.properties.label;\nif (showIcon) {\n element.show();\n var icon = ctx.properties.icon;\n var iconSize = ctx.properties.iconSize;\n var iconColor = ctx.properties.iconColor;\n ctx.api.icon(element, icon, iconSize, iconColor, true);\n if (!showLabel) {\n element.transform({translateX: 119, translateY: 137});\n }\n} else {\n element.hide()\n}",
"actions": null
},
{

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -95,7 +95,7 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
() -> {
RuleChain ruleChain = provider.apply(ruleChainId);
if (ruleChain == null) {
return new RuleChainErrorActor.ActorCreator(systemContext, tenantId,
return new RuleChainErrorActor.ActorCreator(systemContext, tenantId, ruleChainId,
new RuleEngineException("Rule Chain with id: " + ruleChainId + " not found!"));
} else {
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);

View File

@ -19,16 +19,15 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbStringActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import java.util.UUID;
@Slf4j
public class RuleChainErrorActor extends ContextAwareActor {
@ -43,9 +42,8 @@ public class RuleChainErrorActor extends ContextAwareActor {
@Override
protected boolean doProcess(TbActorMsg msg) {
if (msg instanceof RuleChainAwareMsg) {
if (msg instanceof RuleChainAwareMsg rcMsg) {
log.debug("[{}] Reply with {} for message {}", tenantId, error.getMessage(), msg);
var rcMsg = (RuleChainAwareMsg) msg;
rcMsg.getMsg().getCallback().onFailure(error);
return true;
} else {
@ -56,17 +54,19 @@ public class RuleChainErrorActor extends ContextAwareActor {
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
private final RuleChainId ruleChainId;
private final RuleEngineException error;
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleEngineException error) {
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, RuleEngineException error) {
super(context);
this.tenantId = tenantId;
this.ruleChainId = ruleChainId;
this.error = error;
}
@Override
public TbActorId createActorId() {
return new TbStringActorId(UUID.randomUUID().toString());
return new TbEntityActorId(ruleChainId);
}
@Override

View File

@ -20,6 +20,7 @@ import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import java.util.HashSet;
import java.util.Set;
/**
@ -48,7 +49,7 @@ public class TbSubscriptionsInfo {
}
protected TbSubscriptionsInfo copy(int seqNumber) {
return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys, attrAllKeys, attrKeys, seqNumber);
return new TbSubscriptionsInfo(notifications, alarms, tsAllKeys, tsKeys != null ? new HashSet<>(tsKeys) : null, attrAllKeys, attrKeys != null ? new HashSet<>(attrKeys) : null, seqNumber);
}
}

View File

@ -18,57 +18,130 @@ package org.thingsboard.server.actors.tenant;
import org.junit.Before;
import org.junit.Test;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.DefaultTbActorSystem;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorMailbox;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbActorSystem;
import org.thingsboard.server.actors.TbActorSystemSettings;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.ruleChain.RuleChainActor;
import org.thingsboard.server.actors.ruleChain.RuleChainToRuleChainMsg;
import org.thingsboard.server.actors.shared.RuleChainErrorActor;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.actors.service.DefaultActorService.RULE_DISPATCHER_NAME;
public class TenantActorTest {
TenantActor tenantActor;
TbActorCtx ctx;
ActorSystemContext systemContext;
RuleChainService ruleChainService;
PartitionService partitionService;
TenantId tenantId = TenantId.SYS_TENANT_ID;
DeviceId deviceId = DeviceId.fromString("78bf9b26-74ef-4af2-9cfb-ad6cf24ad2ec");
RuleChainId ruleChainId = new RuleChainId(UUID.fromString("48cfa2b0-3dca-11ef-8d1a-37c2894cc59c"));
@Before
public void setUp() throws Exception {
systemContext = mock(ActorSystemContext.class);
ctx = mock(TbActorCtx.class);
ruleChainService = mock(RuleChainService.class);
partitionService = mock();
TbServiceInfoProvider serviceInfoProvider = mock(TbServiceInfoProvider.class);
TbApiUsageStateService apiUsageService = mock(TbApiUsageStateService.class);
TenantService tenantService = mock(TenantService.class);
when(systemContext.getRuleChainService()).thenReturn(ruleChainService);
tenantActor = (TenantActor) new TenantActor.ActorCreator(systemContext, tenantId).createActor();
when(systemContext.getTenantService()).thenReturn(mock(TenantService.class));
tenantActor.init(ctx);
tenantActor.cantFindTenant = false;
when(tenantService.findTenantById(tenantId)).thenReturn(mock());
when(systemContext.getTenantService()).thenReturn(tenantService);
when(serviceInfoProvider.isService(ServiceType.TB_CORE)).thenReturn(true);
when(serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)).thenReturn(true);
when(systemContext.getServiceInfoProvider()).thenReturn(serviceInfoProvider);
when(partitionService.isManagedByCurrentService(tenantId)).thenReturn(true);
when(systemContext.getPartitionService()).thenReturn(partitionService);
when(systemContext.getApiUsageStateService()).thenReturn(apiUsageService);
when(apiUsageService.getApiUsageState(tenantId)).thenReturn(new ApiUsageState());
}
@Test
public void deleteDeviceTest() {
public void deleteDeviceTest() throws Exception {
TbActorCtx ctx = mock(TbActorCtx.class);
tenantActor.init(ctx);
TbActorRef deviceActorRef = mock(TbActorRef.class);
when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0,true));
when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0, true));
when(ctx.getOrCreateChildActor(any(), any(), any(), any())).thenReturn(deviceActorRef);
ComponentLifecycleMsg componentLifecycleMsg = new ComponentLifecycleMsg(tenantId, deviceId, ComponentLifecycleEvent.DELETED);
tenantActor.doProcess(componentLifecycleMsg);
verify(deviceActorRef).tellWithHighPriority(eq(new DeviceDeleteMsg(tenantId, deviceId)));
reset(ctx, deviceActorRef);
when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1,false));
when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1, false));
tenantActor.doProcess(componentLifecycleMsg);
verify(ctx, never()).getOrCreateChildActor(any(), any(), any(), any());
verify(deviceActorRef, never()).tellWithHighPriority(any());
}
@Test
public void ruleChainErrorActorTest() throws Exception {
TbActorSystemSettings settings = new TbActorSystemSettings(0, 0, 0);
TbActorSystem system = spy(new DefaultTbActorSystem(settings));
system.createDispatcher(RULE_DISPATCHER_NAME, mock());
TbActorMailbox tenantCtx = new TbActorMailbox(system, settings, null, mock(), mock(), null);
tenantActor.init(tenantCtx);
TbMsg msg = mock(TbMsg.class);
when(ruleChainService.findRuleChainById(tenantId, ruleChainId)).thenReturn(new RuleChain(ruleChainId));
RuleChainToRuleChainMsg ruleChainMsg = new RuleChainToRuleChainMsg(ruleChainId, null, msg, null);
tenantActor.doProcess(ruleChainMsg);
verify(system).createChildActor(eq(RULE_DISPATCHER_NAME), any(RuleChainActor.ActorCreator.class), any());
reset(system);
tenantActor.doProcess(ruleChainMsg);
verify(system, never()).createChildActor(any(), any(), any());
//Delete rule-chain
TbActorRef ruleChainActor = system.getActor(new TbEntityActorId(ruleChainId));
assertNotNull(ruleChainActor);
system.stop(ruleChainActor);
when(ruleChainService.findRuleChainById(tenantId, ruleChainId)).thenReturn(null);
tenantActor.doProcess(ruleChainMsg);
verify(system).createChildActor(eq(RULE_DISPATCHER_NAME), any(RuleChainErrorActor.ActorCreator.class), any());
reset(system);
tenantActor.doProcess(ruleChainMsg);
verify(system, never()).createChildActor(any(), any(), any());
system.stop();
}
}

View File

@ -0,0 +1,126 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DefaultTbLocalSubscriptionServiceTest {
ListAppender<ILoggingEvent> testLogAppender;
TbLocalSubscriptionService subscriptionService;
@BeforeEach
public void setUp() throws Exception {
Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
testLogAppender = new ListAppender<>();
testLogAppender.start();
logger.addAppender(testLogAppender);
RateLimitService rateLimitService = mock();
when(rateLimitService.checkRateLimit(eq(LimitedApi.WS_SUBSCRIPTIONS), any(Object.class), nullable(String.class))).thenReturn(true);
PartitionService partitionService = mock();
when(partitionService.resolve(any(), any(), any())).thenReturn(TopicPartitionInfo.builder().build());
subscriptionService = new DefaultTbLocalSubscriptionService(mock(), mock(), mock(), partitionService, mock(), mock(), mock(), rateLimitService);
ReflectionTestUtils.setField(subscriptionService, "serviceId", "serviceId");
}
@AfterEach
public void tearDown() {
if (testLogAppender != null) {
testLogAppender.stop();
Logger logger = (Logger) LoggerFactory.getLogger(DefaultTbLocalSubscriptionService.class);
logger.detachAppender(testLogAppender);
}
}
@Test
public void addSubscriptionConcurrentModificationTest() throws Exception {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
TenantId tenantId = new TenantId(UUID.randomUUID());
DeviceId deviceId = new DeviceId(UUID.randomUUID());
WebSocketSessionRef sessionRef = mock();
ReflectionTestUtils.setField(subscriptionService, "subscriptionUpdateExecutor", executorService);
List<ListenableFuture<?>> futures = new ArrayList<>();
try {
subscriptionService.onCoreStartupMsg(TransportProtos.CoreStartupMsg.newBuilder().addAllPartitions(List.of(0)).getDefaultInstanceForType());
for (int i = 0; i < 50; i++) {
futures.add(executorService.submit(() -> subscriptionService.addSubscription(createSubscription(tenantId, deviceId), sessionRef)));
}
Futures.allAsList(futures).get();
} finally {
executorService.shutdownNow();
}
List<ILoggingEvent> logs = testLogAppender.list;
boolean exceptionLogged = logs.stream()
.filter(event -> event.getThrowableProxy() != null)
.map(event -> event.getThrowableProxy().getClassName())
.anyMatch(log -> log.equals("java.util.ConcurrentModificationException"));
assertFalse(exceptionLogged, "Detected ConcurrentModificationException!");
}
private TbSubscription<?> createSubscription(TenantId tenantId, EntityId entityId) {
Map<String, Long> keys = new HashMap<>();
for (int i = 0; i < 50; i++) {
keys.put(RandomStringUtils.randomAlphanumeric(5), 1L);
}
return TbAttributeSubscription.builder()
.tenantId(tenantId)
.entityId(entityId)
.subscriptionId(1)
.sessionId(RandomStringUtils.randomAlphanumeric(5))
.keyStates(keys)
.build();
}
}

View File

@ -42,6 +42,7 @@ public class Lwm2mTestHelper {
public static final int RESOURCE_ID_11 = 11;
public static final int RESOURCE_ID_14 = 14;
public static final int RESOURCE_ID_15 = 15;
public static final int RESOURCE_ID_5700 = 5700;
public static final int RESOURCE_INSTANCE_ID_0 = 0;
public static final int RESOURCE_INSTANCE_ID_2 = 2;
@ -51,6 +52,12 @@ public class Lwm2mTestHelper {
public static final String RESOURCE_ID_NAME_19_0_2 = "dataCreationTime";
public static final String RESOURCE_ID_NAME_19_1_0 = "dataWrite";
public static final String RESOURCE_ID_NAME_19_0_3 = "dataDescription";
public static final String RESOURCE_ID_NAME_3303_12_5700 = "sensorValue";
public static final double RESOURCE_ID_3303_12_5700_VALUE_0 = 25.05d;
public static final double RESOURCE_ID_3303_12_5700_VALUE_1 = 35.12d;
public static long RESOURCE_ID_3303_12_5700_TS_0 = 0;
public static long RESOURCE_ID_3303_12_5700_TS_1 = 0;
public static final int RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS = 3000;
public enum LwM2MClientState {
@ -72,8 +79,8 @@ public class Lwm2mTestHelper {
ON_DEREGISTRATION_FAILURE(14, "onDeregistrationFailure"),
ON_DEREGISTRATION_TIMEOUT(15, "onDeregistrationTimeout"),
ON_EXPECTED_ERROR(16, "onUnexpectedError"),
ON_READ_CONNECTION_ID (17, "onReadConnection"),
ON_WRITE_CONNECTION_ID (18, "onWriteConnection");
ON_READ_CONNECTION_ID(17, "onReadConnection"),
ON_WRITE_CONNECTION_ID(18, "onWriteConnection");
public int code;
public String type;

View File

@ -137,7 +137,6 @@ public class LwM2MTestClient {
private Map<LwM2MClientState, Integer> clientDtlsCid;
private LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandlerTest;
private LwM2mClientContext clientContext;
public void init(Security security, Security securityBs, int port, boolean isRpc,
LwM2mUplinkMsgHandler defaultLwM2mUplinkMsgHandler,
LwM2mClientContext clientContext, boolean isWriteAttribute, Integer cIdLength, boolean queueMode,
@ -159,11 +158,11 @@ public class LwM2MTestClient {
initializer.setClassForObject(SECURITY, Security.class);
initializer.setInstancesForObject(SECURITY, instances);
// SERVER
Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60));
Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60));
lwm2mServer.setId(serverId);
Server serverBs = new Server(shortServerIdBs0, TimeUnit.MINUTES.toSeconds(60));
Server serverBs = new Server(shortServerIdBs0, TimeUnit.MINUTES.toSeconds(60));
serverBs.setId(serverIdBs);
instances = new LwM2mInstanceEnabler[]{serverBs, lwm2mServer};
instances = new LwM2mInstanceEnabler[]{serverBs, lwm2mServer};
initializer.setClassForObject(SERVER, Server.class);
initializer.setInstancesForObject(SERVER, instances);
} else if (securityBs != null) {
@ -177,7 +176,7 @@ public class LwM2MTestClient {
// SERVER
Server lwm2mServer = new Server(shortServerId, TimeUnit.MINUTES.toSeconds(60));
lwm2mServer.setId(serverId);
initializer.setInstancesForObject(SERVER, lwm2mServer );
initializer.setInstancesForObject(SERVER, lwm2mServer);
}
initializer.setInstancesForObject(DEVICE, lwM2MDevice = new SimpleLwM2MDevice(executor));
@ -239,11 +238,11 @@ public class LwM2MTestClient {
boolean supportDeprecatedCiphers = false;
clientCoapConfig.set(DTLS_RECOMMENDED_CIPHER_SUITES_ONLY, !supportDeprecatedCiphers);
if (cIdLength!= null) {
if (cIdLength != null) {
setDtlsConnectorConfigCidLength(clientCoapConfig, cIdLength);
}
if (cIdLength!= null) {
if (cIdLength != null) {
setDtlsConnectorConfigCidLength(clientCoapConfig, cIdLength);
}
@ -262,12 +261,12 @@ public class LwM2MTestClient {
// Configure Registration Engine
DefaultRegistrationEngineFactory engineFactory = new DefaultRegistrationEngineFactory();
// old
// old
/**
* Force reconnection/rehandshake on registration update.
*/
int comPeriodInSec = 5;
if (comPeriodInSec > 0) engineFactory.setCommunicationPeriod(comPeriodInSec * 1000);
if (comPeriodInSec > 0) engineFactory.setCommunicationPeriod(comPeriodInSec * 1000);
// engineFactory.setCommunicationPeriod(5000); // old
/**
* By default client will try to resume DTLS session by using abbreviated Handshake. This option force to always do a full handshake."
@ -288,7 +287,7 @@ public class LwM2MTestClient {
builder.setDataSenders(new ManualDataSender());
builder.setRegistrationEngineFactory(engineFactory);
Map<ContentFormat, NodeDecoder> decoders = new HashMap<>();
Map<ContentFormat, NodeEncoder> encoders = new HashMap<>();
Map<ContentFormat, NodeEncoder> encoders = new HashMap<>();
if (supportFormatOnly_SenMLJSON_SenMLCBOR) {
// decoders.put(ContentFormat.OPAQUE, new LwM2mNodeOpaqueDecoder());
decoders.put(ContentFormat.CBOR, new LwM2mNodeCborDecoder());

View File

@ -26,16 +26,20 @@ import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.argument.Arguments;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper;
import javax.security.auth.Destroyable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS;
@Slf4j
public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destroyable {
@ -46,7 +50,9 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
private double maxMeasuredValue = currentTemp;
private LeshanClient leshanClient;
private List<Double> containingValues;
private int cntRead_5700;
private int cntIdentitySystem;
protected static final Random RANDOM = new Random();
private static final List<Integer> supportedResources = Arrays.asList(5601, 5602, 5700, 5701);
@ -57,7 +63,7 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
public LwM2mTemperatureSensor(ScheduledExecutorService executorService, Integer id) {
try {
if (id != null) this.setId(id);
executorService.scheduleWithFixedDelay(this::adjustTemperature, 2000, 2000, TimeUnit.MILLISECONDS);
executorService.scheduleWithFixedDelay(this::adjustTemperature, 2000, 2000, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.error("[{}]Throwable", e.toString());
e.printStackTrace();
@ -73,15 +79,18 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
case 5602:
return ReadResponse.success(resourceId, getTwoDigitValue(maxMeasuredValue));
case 5700:
if (identity == LwM2mServer.SYSTEM) {
setTemperature();
setData();
if (identity == LwM2mServer.SYSTEM) { // return value for ForCollectedValue
cntIdentitySystem++;
return ReadResponse.success(resourceId, cntIdentitySystem == 1 ?
RESOURCE_ID_3303_12_5700_VALUE_0 : RESOURCE_ID_3303_12_5700_VALUE_1);
}
cntRead_5700++;
if (cntRead_5700 == 1) { // read value after start
return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp));
} else if (this.getId() == 12 && this.leshanClient != null) {
containingValues = new ArrayList<>();
sendCollected(5700);
return ReadResponse.success(resourceId, getData());
} else {
if (this.getId() == 12 && this.leshanClient != null) {
sendCollected();
}
return ReadResponse.success(resourceId, getTwoDigitValue(currentTemp));
}
case 5701:
@ -117,10 +126,11 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
}
}
private void setTemperature(){
private void setTemperature() {
float delta = (RANDOM.nextInt(20) - 10) / 10f;
currentTemp += delta;
}
private synchronized Integer adjustMinMaxMeasuredValue(double newTemperature) {
if (newTemperature > maxMeasuredValue) {
maxMeasuredValue = newTemperature;
@ -143,7 +153,7 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
return supportedResources;
}
protected void setLeshanClient(LeshanClient leshanClient){
protected void setLeshanClient(LeshanClient leshanClient) {
this.leshanClient = leshanClient;
}
@ -151,40 +161,26 @@ public class LwM2mTemperatureSensor extends BaseInstanceEnabler implements Destr
public void destroy() {
}
private void sendCollected(int resourceId) {
private void sendCollected() {
try {
int resourceId = 5700;
LwM2mServer registeredServer = this.leshanClient.getRegisteredServers().values().iterator().next();
ManualDataSender sender = this.leshanClient.getSendService().getDataSender(ManualDataSender.DEFAULT_NAME,
ManualDataSender.class);
sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId)));
Thread.sleep(1000);
Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_0 = Instant.now().toEpochMilli();
Thread.sleep(RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS);
sender.collectData(Arrays.asList(getPathForCollectedValue(resourceId)));
Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_1 = Instant.now().toEpochMilli();
sender.sendCollectedData(registeredServer, ContentFormat.SENML_JSON, 1000, false);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private LwM2mPath getPathForCollectedValue(int resourceId) {
return new LwM2mPath(3303, this.getId(), resourceId);
}
private double getData() {
if (containingValues.size() > 1) {
Integer t0 = Math.toIntExact(Math.round(containingValues.get(0) * 100));
Integer t1 = Math.toIntExact(Math.round(containingValues.get(1) * 100));
long to_t1 = (((long) t0) << 32) | (t1 & 0xffffffffL);
return Double.longBitsToDouble(to_t1);
} else {
return currentTemp;
}
}
private void setData() {
if (containingValues == null){
containingValues = new ArrayList<>();
}
containingValues.add(getTwoDigitValue(currentTemp));
}
}

View File

@ -15,20 +15,30 @@
*/
package org.thingsboard.server.transport.lwm2m.rpc;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.link.LinkParser;
import org.eclipse.leshan.core.link.lwm2m.DefaultLwM2mLinkParser;
import org.junit.Before;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest;
import org.thingsboard.server.transport.lwm2m.server.LwM2mTransportServerHelper;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import static org.awaitility.Awaitility.await;
import static org.eclipse.leshan.core.LwM2mId.ACCESS_CONTROL;
import static org.eclipse.leshan.core.LwM2mId.DEVICE;
import static org.eclipse.leshan.core.LwM2mId.FIRMWARE;
@ -40,19 +50,23 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_ID_0
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_ID_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_12;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_14;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_5700;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_9;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_2;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_1_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3303_12_5700;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_14;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.TEMPERATURE_SENSOR;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.resources;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fromVersionedIdToObjectId;
@Slf4j
@DaoSqlTest
public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MIntegrationTest {
@ -84,6 +98,12 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg
protected String idVer_19_0_0;
@SpyBean
protected DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest;
@SpyBean
protected LwM2mTransportServerHelper lwM2mTransportServerHelperTest;
public AbstractRpcLwM2MIntegrationTest() {
setResources(resources);
}
@ -144,7 +164,8 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg
" \"" + objectIdVer_3 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_14 + "\": \"" + RESOURCE_ID_NAME_3_14 + "\",\n" +
" \"" + idVer_19_0_0 + "\": \"" + RESOURCE_ID_NAME_19_0_0 + "\",\n" +
" \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\": \"" + RESOURCE_ID_NAME_19_1_0 + "\",\n" +
" \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_2 + "\": \"" + RESOURCE_ID_NAME_19_0_2 + "\"\n" +
" \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_2 + "\": \"" + RESOURCE_ID_NAME_19_0_2 + "\",\n" +
" \"" + objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + RESOURCE_ID_5700 + "\": \"" + RESOURCE_ID_NAME_3303_12_5700 + "\"\n" +
" },\n" +
" \"observe\": [\n" +
" \"" + idVer_3_0_9 + "\",\n" +
@ -159,7 +180,8 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg
" \"telemetry\": [\n" +
" \"" + idVer_3_0_9 + "\",\n" +
" \"" + idVer_19_0_0 + "\",\n" +
" \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\"\n" +
" \"" + objectIdVer_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + "\",\n" +
" \"" + objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + RESOURCE_ID_5700 + "\"\n" +
" ],\n" +
" \"attributeLwm2m\": {}\n" +
" }";
@ -183,4 +205,56 @@ public abstract class AbstractRpcLwM2MIntegrationTest extends AbstractLwM2MInteg
return pathIdVer;
}
protected long countUpdateAttrTelemetryAll() {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation -> invocation.getMethod().getName().equals("updateAttrTelemetry"))
.count();
}
protected long countUpdateAttrTelemetryResource(String idVerRez) {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation ->
invocation.getMethod().getName().equals("updateAttrTelemetry") &&
invocation.getArguments().length > 1 &&
idVerRez.equals(invocation.getArguments()[1])
)
.count();
}
protected void updateRegAtLeastOnceAfterAction() {
long initialInvocationCount = countUpdateReg();
AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount);
log.trace("updateRegAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount);
await("Update Registration at-least-once after action")
.atMost(50, TimeUnit.SECONDS)
.until(() -> {
newInvocationCount.set(countUpdateReg());
return newInvocationCount.get() > initialInvocationCount;
});
log.trace("updateRegAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get());
}
protected long countUpdateReg() {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation -> invocation.getMethod().getName().equals("updatedReg"))
.count();
}
protected long countSendParametersOnThingsboardTelemetryResource(String rezName) {
return Mockito.mockingDetails(lwM2mTransportServerHelperTest)
.getInvocations().stream()
.filter(invocation ->
invocation.getMethod().getName().equals("sendParametersOnThingsboardTelemetry") &&
invocation.getArguments().length > 0 &&
invocation.getArguments()[0] instanceof List &&
((List<?>) invocation.getArguments()[0]).stream()
.filter(arg -> arg instanceof TransportProtos.KeyValueProto)
.anyMatch(arg -> rezName.equals(((TransportProtos.KeyValueProto) arg).getKey()))
)
.count();
}
}

View File

@ -20,11 +20,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.ResponseCode;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationObserveTest;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@ -55,10 +52,6 @@ import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fr
@Slf4j
public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MIntegrationObserveTest {
@SpyBean
DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest;
/**
* ObserveComposite {"ids":["5/0/7", "5/0/5", "5/0/3", "3/0/9", "19/1/0/0"]} - Ok
* @throws Exception
@ -517,13 +510,6 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt
return doPostAsync("/api/plugins/rpc/twoway/" + deviceId, sendRpcRequest, String.class, status().isOk());
}
private long countUpdateAttrTelemetryAll() {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation -> invocation.getMethod().getName().equals("updateAttrTelemetry"))
.count();
}
private void updateAttrTelemetryAllAtLeastOnceAfterAction(long initialInvocationCount) {
AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount);
log.warn("countUpdateAttrTelemetryAllAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount);
@ -536,19 +522,6 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt
log.warn("countUpdateAttrTelemetryAllAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get());
}
private long countUpdateAttrTelemetryResource(String idVerRez) {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation ->
invocation.getMethod().getName().equals("updateAttrTelemetry") &&
invocation.getArguments().length > 1 &&
idVerRez.equals(invocation.getArguments()[1])
)
.count();
}
private void updateAttrTelemetryResourceAtLeastOnceAfterAction(long initialInvocationCount, String idVerRez) {
AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount);
log.warn("countUpdateAttrTelemetryResourceAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount);
@ -560,24 +533,4 @@ public class RpcLwm2MIntegrationObserveCompositeTest extends AbstractRpcLwM2MInt
});
log.warn("countUpdateAttrTelemetryResourceAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get());
}
private long countUpdateReg() {
return Mockito.mockingDetails(defaultUplinkMsgHandlerTest)
.getInvocations().stream()
.filter(invocation -> invocation.getMethod().getName().equals("updatedReg"))
.count();
}
private void updateRegAtLeastOnceAfterAction() {
long initialInvocationCount = countUpdateReg();
AtomicLong newInvocationCount = new AtomicLong(initialInvocationCount);
log.warn("updateRegAtLeastOnceAfterAction: initialInvocationCount [{}]", initialInvocationCount);
await("Update Registration at-least-once after action")
.atMost(50, TimeUnit.SECONDS)
.until(() -> {
newInvocationCount.set(countUpdateReg());
return newInvocationCount.get() > initialInvocationCount;
});
log.warn("updateRegAtLeastOnceAfterAction: newInvocationCount [{}]", newInvocationCount.get());
}
}

View File

@ -24,9 +24,7 @@ import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.server.registration.Registration;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationObserveTest;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import java.util.Optional;
@ -41,14 +39,12 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INST
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fromVersionedIdToObjectId;
@Slf4j
public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationObserveTest {
@SpyBean
DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest;
@Test
public void testObserveReadAll_Count_4_CancelAll_Count_0_Ok() throws Exception {
String actualValuesReadAll = sendRpcObserveOkWithResultValue("ObserveReadAll", null);
@ -64,12 +60,12 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
*/
@Test
public void testObserveOneResource_Result_CONTENT_Value_Count_3_After_Cancel_Count_2() throws Exception {
long initSendTelemetryAtCount = countSendParametersOnThingsboardTelemetryResource(RESOURCE_ID_NAME_3_9);
sendObserveCancelAllWithAwait(deviceId);
sendRpcObserveWithContainsLwM2mSingleResource(idVer_3_0_9);
int cntUpdate = 3;
verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate))
.onUpdateValueAfterReadResponse(Mockito.any(Registration.class), eq(idVer_3_0_9), Mockito.any(ReadResponse.class));
updateRegAtLeastOnceAfterAction();
long lastSendTelemetryAtCount = countSendParametersOnThingsboardTelemetryResource(RESOURCE_ID_NAME_3_9);
assertTrue(lastSendTelemetryAtCount > initSendTelemetryAtCount);
}
/**
@ -84,7 +80,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
int cntUpdate = 3;
verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate))
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9));
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null));
}
/**
@ -99,7 +95,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
int cntUpdate = 3;
verify(defaultUplinkMsgHandlerTest, timeout(10000).times(cntUpdate))
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9));
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null));
}
/**
@ -334,7 +330,7 @@ public class RpcLwm2mIntegrationObserveTest extends AbstractRpcLwM2MIntegrationO
cntUpdate = 10;
verify(defaultUplinkMsgHandlerTest, timeout(50000).atLeast(cntUpdate))
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9));
.updateAttrTelemetry(Mockito.any(Registration.class), eq(idVer_3_0_9), eq(null));
}
private void sendRpcObserveWithWithTwoResource(String expectedId_1, String expectedId_2) throws Exception {

View File

@ -15,30 +15,26 @@
*/
package org.thingsboard.server.transport.lwm2m.rpc.sql;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.map.HashedMap;
import org.eclipse.leshan.core.ResponseCode;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.TimestampedLwM2mNodes;
import org.eclipse.leshan.server.registration.Registration;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.transport.lwm2m.rpc.AbstractRpcLwM2MIntegrationTest;
import org.thingsboard.server.transport.lwm2m.server.uplink.DefaultLwM2mUplinkMsgHandler;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.eclipse.leshan.core.LwM2mId.SERVER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.BINARY_APP_DATA_CONTAINER;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.OBJECT_INSTANCE_ID_0;
@ -49,20 +45,22 @@ import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_11;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_14;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_2;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_TS_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_9;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_0_3;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_19_1_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3303_12_5700;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_14;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_NAME_3_9;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_0;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_3303_12_5700_VALUE_1;
import static org.thingsboard.server.transport.lwm2m.Lwm2mTestHelper.RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS;
@Slf4j
public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest {
@SpyBean
DefaultLwM2mUplinkMsgHandler defaultUplinkMsgHandlerTest;
/**
* Read {"id":"/3"}
* Read {"id":"/6"}...
@ -88,7 +86,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
e.printStackTrace();
}
});
} catch (Exception e2){
} catch (Exception e2) {
e2.printStackTrace();
}
}
@ -99,10 +97,10 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
* @throws Exception
*/
@Test
public void testReadAllInstancesInClientById_Result_CONTENT_Value_IsInstances_IsResources() throws Exception{
public void testReadAllInstancesInClientById_Result_CONTENT_Value_IsInstances_IsResources() throws Exception {
expectedObjectIdVerInstances.forEach(expected -> {
try {
String actualResult = sendRPCById((String) expected);
String actualResult = sendRPCById((String) expected);
String expectedObjectId = pathIdVerToObjectId((String) expected);
LwM2mPath expectedPath = new LwM2mPath(expectedObjectId);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
@ -122,7 +120,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
*/
@Test
public void testReadMultipleResourceById_Result_CONTENT_Value_IsLwM2mMultipleResource() throws Exception {
String expectedIdVer = objectInstanceIdVer_3 +"/" + RESOURCE_ID_11;
String expectedIdVer = objectInstanceIdVer_3 + "/" + RESOURCE_ID_11;
String actualResult = sendRPCById(expectedIdVer);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
@ -135,7 +133,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
*/
@Test
public void testReadSingleResourceById_Result_CONTENT_Value_IsLwM2mSingleResource() throws Exception {
String expectedIdVer = objectInstanceIdVer_3 +"/" + RESOURCE_ID_14;
String expectedIdVer = objectInstanceIdVer_3 + "/" + RESOURCE_ID_14;
String actualResult = sendRPCById(expectedIdVer);
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
@ -161,7 +159,7 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
*/
@Test
public void testReadCompositeSingleResourceByIds_Result_CONTENT_Value_IsObjectIsLwM2mSingleResourceIsLwM2mMultipleResource() throws Exception {
String expectedIdVer_1 = (String) expectedObjectIdVers.stream().filter(path -> (!((String)path).contains("/" + BINARY_APP_DATA_CONTAINER) && ((String)path).contains("/" + SERVER))).findFirst().get();
String expectedIdVer_1 = (String) expectedObjectIdVers.stream().filter(path -> (!((String) path).contains("/" + BINARY_APP_DATA_CONTAINER) && ((String) path).contains("/" + SERVER))).findFirst().get();
String objectId_1 = pathIdVerToObjectId(expectedIdVer_1);
String expectedIdVer3_0_1 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_1;
String expectedIdVer3_0_11 = objectInstanceIdVer_3 + "/" + RESOURCE_ID_11;
@ -221,8 +219,8 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
String objectId_19 = pathIdVerToObjectId(objectIdVer_19);
String expected3_0_9 = objectInstanceId_3 + "/" + RESOURCE_ID_9 + "=LwM2mSingleResource [id=" + RESOURCE_ID_9 + ", value=";
String expected3_0_14 = objectInstanceId_3 + "/" + RESOURCE_ID_14 + "=LwM2mSingleResource [id=" + RESOURCE_ID_14 + ", value=";
String expected19_0_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_0 + expectedKey19_X_0;
String expected19_1_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + expectedKey19_X_0;
String expected19_0_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_0 + "/" + RESOURCE_ID_0 + expectedKey19_X_0;
String expected19_1_0 = objectId_19 + "/" + OBJECT_INSTANCE_ID_1 + "/" + RESOURCE_ID_0 + expectedKey19_X_0;
String actualValues = rpcActualResult.get("value").asText();
assertTrue(actualValues.contains(expected3_0_9));
assertTrue(actualValues.contains(expected3_0_14));
@ -232,56 +230,55 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
/**
* /3303/0/5700
* Read {"id":"/3303/0/5700"}
* Read {"id":"/3303/12/5700"}
* Trigger a Send operation from the client with multiple values for the same resource as a payload
* acked "[{"bn":"/3303/12/5700","bt":1724".. 116 bytes]
* 2 values for the resource /3303/12/5700 should be stored with timestamps1 = Instance.now(), timestamps2 = Instance.now()
*
* 2 values for the resource /3303/12/5700 should be stored with:
* - timestamps1 = Instance.now() + RESOURCE_ID_VALUE_3303_12_5700_1
* - timestamps2 = (timestamps1 + 3 sec) + RESOURCE_ID_VALUE_3303_12_5700_2
* @throws Exception
*/
@Test
public void testReadSingleResource_sendFromClient_CollectedValue() throws Exception {
TimestampedLwM2mNodes[] tsNodesHolder = new TimestampedLwM2mNodes[1];
doAnswer(inv -> {
tsNodesHolder[0] = inv.getArgument(1);
return null;
}).when(defaultUplinkMsgHandlerTest).onUpdateValueWithSendRequest(
Mockito.any(Registration.class),
Mockito.any(TimestampedLwM2mNodes.class)
);
// init test
long startTs = Instant.now().toEpochMilli();
int cntValues = 4;
int resourceId = 5700;
String expectedIdVer = objectIdVer_3303 + "/" + OBJECT_INSTANCE_ID_12 + "/" + resourceId;
String actualResult = sendRPCById(expectedIdVer);
verify(defaultUplinkMsgHandlerTest, timeout(10000).times(1))
.onUpdateValueWithSendRequest(Mockito.any(Registration.class), Mockito.any(TimestampedLwM2mNodes.class));
ObjectNode rpcActualResult = JacksonUtil.fromString(actualResult, ObjectNode.class);
assertEquals(ResponseCode.CONTENT.getName(), rpcActualResult.get("result").asText());
String expected = "LwM2mSingleResource [id=" + resourceId + ", value=";
String actual = rpcActualResult.get("value").asText();
assertTrue(actual.contains(expected));
int indStart = actual.indexOf(expected) + expected.length();
int indEnd = actual.indexOf(",", indStart);
String valStr = actual.substring(indStart, indEnd);
double dd = Double.parseDouble(valStr);
long combined = Double.doubleToRawLongBits(dd);
int t0 = (int) (combined >> 32);
int t1 = (int) combined;
double[] expectedValues ={(double)t0/100, (double)t1/100};
int ind = 0;
LwM2mPath expectedPath = new LwM2mPath("/3303/12/5700");
for (Instant ts : tsNodesHolder[0].getTimestamps()) {
Map<LwM2mPath, LwM2mNode> nodesAt = tsNodesHolder[0].getNodesAt(ts);
for (var instant : nodesAt.entrySet()) {
LwM2mPath actualPath = instant.getKey();
LwM2mNode node = instant.getValue();
LwM2mResource lwM2mResource = (LwM2mResource) node;
assertEquals(expectedPath, actualPath);
assertEquals(expectedValues[ind], lwM2mResource.getValue());
ind++;
sendRPCById(expectedIdVer);
// verify result read: verify count value: 1-2: send CollectedValue; 3 - response for read;
long endTs = Instant.now().toEpochMilli() + RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS * 4;
String expectedVal_1 = String.valueOf(RESOURCE_ID_3303_12_5700_VALUE_0);
String expectedVal_2 = String.valueOf(RESOURCE_ID_3303_12_5700_VALUE_1);
AtomicReference<ObjectNode> actualValues = new AtomicReference<>();
await().atMost(40, SECONDS).until(() -> {
actualValues.set(doGetAsync(
"/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?keys="
+ RESOURCE_ID_NAME_3303_12_5700
+ "&startTs=" + startTs
+ "&endTs=" + endTs
+ "&interval=0&limit=100&useStrictDataTypes=false",
ObjectNode.class));
// verify cntValues
return actualValues.get() != null && actualValues.get().get(RESOURCE_ID_NAME_3303_12_5700).size() == cntValues;
});
// verify ts
ArrayNode actual = (ArrayNode) actualValues.get().get(RESOURCE_ID_NAME_3303_12_5700);
Map<String, Long> keyTsMaps = new HashedMap();
for (JsonNode tsNode: actual) {
if (tsNode.get("value").asText().equals(expectedVal_1) || tsNode.get("value").asText().equals(expectedVal_2)) {
keyTsMaps.put(tsNode.get("value").asText(), tsNode.get("ts").asLong());
}
}
assertTrue(keyTsMaps.size() == 2);
long actualTS0 = keyTsMaps.get(expectedVal_1).longValue();
long actualTS1 = keyTsMaps.get(expectedVal_2).longValue();
assertTrue(actualTS0 > 0);
assertTrue(actualTS1 > 0);
assertTrue(actualTS1 > actualTS0);
assertTrue((actualTS1 - actualTS0) >= RESOURCE_ID_VALUE_3303_12_5700_DELTA_TS);
assertTrue(actualTS0 <= RESOURCE_ID_3303_12_5700_TS_0);
assertTrue(actualTS1 <= RESOURCE_ID_3303_12_5700_TS_1);
}
/**
@ -301,7 +298,6 @@ public class RpcLwm2mIntegrationReadTest extends AbstractRpcLwM2MIntegrationTest
assertEquals(actualValue, expectedValue);
}
private String sendRPCById(String path) throws Exception {
String setRpcRequest = "{\"method\": \"Read\", \"params\": {\"id\": \"" + path + "\"}}";
return doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setRpcRequest, String.class, status().isOk());

View File

@ -407,6 +407,7 @@ public class ProtoUtils {
.setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits())
.setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits())
.setOneway(msg.getMsg().isOneway())
.setPersisted(msg.getMsg().isPersisted())
.build();
return TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder()

View File

@ -133,7 +133,7 @@ public class LwM2MBootstrapSecurityStore implements BootstrapSecurityStore {
log.error(" [{}] Different values SecurityMode between of client and profile.", store.getEndpoint());
log.error("{} getParametersBootstrap: [{}] Different values SecurityMode between of client and profile.", LOG_LWM2M_ERROR, store.getEndpoint());
String logMsg = String.format("%s: Different values SecurityMode between of client and profile.", LOG_LWM2M_ERROR);
helper.sendParametersOnThingsboardTelemetry(helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), sessionInfo);
helper.sendParametersOnThingsboardTelemetry(helper.getKvStringtoThingsboard(LOG_LWM2M_TELEMETRY, logMsg), sessionInfo, null);
return null;
}
}

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -58,12 +59,12 @@ public class LwM2mTransportServerHelper {
context.getTransportService().process(sessionInfo, postAttributeMsg, TransportServiceCallback.EMPTY);
}
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo) {
sendParametersOnThingsboardTelemetry(kvList, sessionInfo, null);
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo, @Nullable Map<String, AtomicLong> keyTsLatestMaps){
sendParametersOnThingsboardTelemetry(kvList, sessionInfo, keyTsLatestMaps, null);
}
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo, @Nullable Map<String, AtomicLong> keyTsLatestMap) {
TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap);
public void sendParametersOnThingsboardTelemetry(List<TransportProtos.KeyValueProto> kvList, SessionInfoProto sessionInfo, @Nullable Map<String, AtomicLong> keyTsLatestMap, @Nullable Instant ts) {
TransportProtos.TsKvListProto tsKvList = toTsKvList(kvList, keyTsLatestMap, ts);
PostTelemetryMsg postTelemetryMsg = PostTelemetryMsg.newBuilder()
.addTsKvList(tsKvList)
@ -72,9 +73,9 @@ public class LwM2mTransportServerHelper {
context.getTransportService().process(sessionInfo, postTelemetryMsg, TransportServiceCallback.EMPTY);
}
TransportProtos.TsKvListProto toTsKvList(List<TransportProtos.KeyValueProto> kvList, Map<String, AtomicLong> keyTsLatestMap) {
TransportProtos.TsKvListProto toTsKvList(List<TransportProtos.KeyValueProto> kvList, Map<String, AtomicLong> keyTsLatestMap, @Nullable Instant ts) {
return TransportProtos.TsKvListProto.newBuilder()
.setTs(getTs(kvList, keyTsLatestMap))
.setTs(ts == null ? getTs(kvList, keyTsLatestMap) : ts.toEpochMilli())
.addAllKv(kvList)
.build();
}

View File

@ -42,7 +42,6 @@ import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.request.CreateRequest;
import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.ReadRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.request.WriteCompositeRequest;
import org.eclipse.leshan.core.request.WriteRequest;
import org.eclipse.leshan.core.request.WriteRequest.Mode;
@ -117,6 +116,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
@ -382,7 +382,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
this.updateObjectInstanceResourceValue(lwM2MClient, lwM2mObjectInstance, path.toString(), 0);
} else if (node instanceof LwM2mResource) {
LwM2mResource lwM2mResource = (LwM2mResource) node;
this.updateResourcesValue(lwM2MClient, lwM2mResource, path.toString(), Mode.UPDATE, 0);
this.updateResourcesValueWithTs(lwM2MClient, lwM2mResource, path.toString(), Mode.UPDATE, ts);
}
}
tryAwake(lwM2MClient);
@ -612,12 +612,21 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
otaService.onCurrentSoftwareResultUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
}
if (ResponseCode.BAD_REQUEST.getCode() > code) {
this.updateAttrTelemetry(registration, path);
this.updateAttrTelemetry(registration, path, null);
}
} else {
log.error("Fail update path [{}] Resource [{}]", path, lwM2mResource);
}
}
private void updateResourcesValueWithTs(LwM2mClient lwM2MClient, LwM2mResource lwM2mResource, String stringPath, Mode mode, Instant ts) {
Registration registration = lwM2MClient.getRegistration();
String path = convertObjectIdToVersionedId(stringPath, lwM2MClient);
if (lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider, mode)) {
this.updateAttrTelemetry(registration, path, ts);
} else {
log.error("Fail update path [{}] Resource [{}] with ts.", path, lwM2mResource);
}
}
/**
@ -629,7 +638,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
*
* @param registration - Registration LwM2M Client
*/
public void updateAttrTelemetry(Registration registration, String path) {
public void updateAttrTelemetry(Registration registration, String path, Instant ts) {
log.trace("UpdateAttrTelemetry paths [{}]", path);
try {
ResultsAddKeyValueProto results = this.getParametersFromProfile(registration, path);
@ -640,8 +649,8 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
this.helper.sendParametersOnThingsboardAttribute(results.getResultAttributes(), sessionInfo);
}
if (results.getResultTelemetries().size() > 0) {
log.trace("UpdateTelemetry paths [{}] value [{}]", path, results.getResultTelemetries().get(0).toString());
this.helper.sendParametersOnThingsboardTelemetry(results.getResultTelemetries(), sessionInfo);
log.trace("UpdateTelemetry paths [{}] value [{}] ts [{}]", path, results.getResultTelemetries().get(0).toString(), ts == null ? "null" : ts.toEpochMilli());
this.helper.sendParametersOnThingsboardTelemetry(results.getResultTelemetries(), sessionInfo, null, ts);
}
}
} catch (Exception e) {

View File

@ -223,7 +223,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
var currentSettings = vcService.getRepositorySettings(ctx.getTenantId());
var newSettings = ctx.getSettings();
if (!newSettings.equals(currentSettings)) {
vcService.initRepository(ctx.getTenantId(), ctx.getSettings());
vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false);
}
if (msg.hasCommitRequest()) {
handleCommitRequest(ctx, msg.getCommitRequest());
@ -464,7 +464,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private void handleInitRepositoryCommand(VersionControlRequestCtx ctx) {
try {
vcService.initRepository(ctx.getTenantId(), ctx.getSettings());
vcService.initRepository(ctx.getTenantId(), ctx.getSettings(), false);
reply(ctx, Optional.empty());
} catch (Exception e) {
log.debug("[{}] Failed to connect to the repository: ", ctx, e);
@ -564,4 +564,5 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
}, MoreExecutors.directExecutor());
}
}
}

View File

@ -205,7 +205,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
if (!Files.exists(Path.of(gitRepository.getDirectory()))) {
try {
return cloneRepository(tenantId, gitRepository.getSettings());
return openOrCloneRepository(tenantId, gitRepository.getSettings(), false);
} catch (Exception e) {
throw new IllegalStateException("Could not initialize the repository: " + e.getMessage(), e);
}
@ -239,11 +239,11 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
}
@Override
public void initRepository(TenantId tenantId, RepositorySettings settings) throws Exception {
public void initRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception {
if (!settings.isLocalOnly()) {
clearRepository(tenantId);
}
cloneRepository(tenantId, settings);
openOrCloneRepository(tenantId, settings, fetch);
}
@Override
@ -280,13 +280,16 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
return EntityIdFactory.getByTypeAndUuid(entityType, entityId);
}
private GitRepository cloneRepository(TenantId tenantId, RepositorySettings settings) throws Exception {
private GitRepository openOrCloneRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception {
log.debug("[{}] Init tenant repository started.", tenantId);
Path repositoryDirectory = Path.of(repositoriesFolder, settings.isLocalOnly() ? "local_" + settings.getRepositoryUri() : tenantId.getId().toString());
GitRepository repository;
if (Files.exists(repositoryDirectory)) {
repository = GitRepository.open(repositoryDirectory.toFile(), settings);
if (fetch) {
repository.fetch();
}
} else {
Files.createDirectories(repositoryDirectory);
if (settings.isLocalOnly()) {

View File

@ -42,7 +42,7 @@ public interface GitRepositoryService {
void testRepository(TenantId tenantId, RepositorySettings settings) throws Exception;
void initRepository(TenantId tenantId, RepositorySettings settings) throws Exception;
void initRepository(TenantId tenantId, RepositorySettings settings, boolean fetch) throws Exception;
RepositorySettings getRepositorySettings(TenantId tenantId) throws Exception;
@ -67,4 +67,5 @@ public interface GitRepositoryService {
String getContentsDiff(TenantId tenantId, String content1, String content2) throws IOException;
void fetch(TenantId tenantId) throws GitAPIException;
}

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
@ -57,6 +58,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.security.DeviceCredentials;
@ -64,6 +66,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static io.restassured.RestAssured.given;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
@ -320,6 +323,27 @@ public class TestRestClient {
.as(JsonNode.class);
}
public Rpc getPersistedRpc(RpcId rpcId) {
return given().spec(requestSpec)
.get("/api/rpc/persistent/{rpcId}", rpcId.toString())
.then()
.statusCode(HTTP_OK)
.extract()
.as(Rpc.class);
}
public PageData<Rpc> getPersistedRpcByDevice(DeviceId deviceId, PageLink pageLink) {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);
return given().spec(requestSpec).queryParams(params)
.get("/api/rpc/persistent/device/{deviceId}", deviceId.toString())
.then()
.statusCode(HTTP_OK)
.extract()
.as(new TypeRef<>() {
});
}
public PageData<DeviceProfile> getDeviceProfiles(PageLink pageLink) {
Map<String, String> params = new HashMap<>();
addPageLinkToParam(params, pageLink);

View File

@ -46,9 +46,11 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rule.NodeConnectionInfo;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
@ -68,6 +70,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
@ -76,6 +79,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import static org.thingsboard.server.common.data.DataConstants.DEVICE;
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
@ -307,6 +311,60 @@ public class MqttClientTest extends AbstractContainerTest {
assertThat(serverResponse).isEqualTo(mapper.readTree(clientResponse.toString()));
}
@Test
public void serverSidePersistedRpc() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient(deviceCredentials, listener);
mqttClient.on("v1/devices/me/rpc/request/+", listener, MqttQoS.AT_LEAST_ONCE).get();
// Wait until subscription is processed
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
// Send an RPC from the server
JsonObject serverRpcPayload = new JsonObject();
serverRpcPayload.addProperty("method", "getValue");
serverRpcPayload.addProperty("params", true);
serverRpcPayload.addProperty("persistent", true);
JsonNode persistentRpcId = testRestClient.postServerSideRpc(device.getId(), mapper.readTree(serverRpcPayload.toString()));
assertNotNull(persistentRpcId);
RpcId rpcId = new RpcId(UUID.fromString(persistentRpcId.get("rpcId").asText()));
// Wait for RPC call from the server and send the response
MqttEvent requestFromServer = listener.getEvents().poll(10 * timeoutMultiplier, TimeUnit.SECONDS);
assertThat(Objects.requireNonNull(requestFromServer).getMessage()).isEqualTo("{\"method\":\"getValue\",\"params\":true}");
Integer requestId = Integer.valueOf(Objects.requireNonNull(requestFromServer).getTopic().substring("v1/devices/me/rpc/request/".length()));
JsonObject clientResponse = new JsonObject();
clientResponse.addProperty("response", "someResponse");
// Send a response to the server's RPC request
mqttClient.publish("v1/devices/me/rpc/response/" + requestId, Unpooled.wrappedBuffer(clientResponse.toString().getBytes())).get();
PageLink pageLink = new PageLink(10);
Awaitility.await()
.pollInterval(500, TimeUnit.MILLISECONDS)
.atMost(5 * timeoutMultiplier, TimeUnit.SECONDS)
.until(() -> {
PageData<Rpc> rpcByDevice = testRestClient.getPersistedRpcByDevice(device.getId(), pageLink);
for (Rpc rpc : rpcByDevice.getData()) {
if (rpc.getId().equals(rpcId)) {
return true;
}
}
return false;
});
Rpc persistentRpc = testRestClient.getPersistedRpc(rpcId);
assertThat(persistentRpc.getResponse()).isEqualTo(mapper.readTree(clientResponse.toString()));
}
@Test
public void clientSideRpc() throws Exception {
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());

View File

@ -47,7 +47,7 @@ public class CertPemCredentials implements ClientCredentials {
protected String caCert;
private String cert;
private String privateKey;
private String password = "";
private String password;
@Override
public CredentialsType getType() {

View File

@ -38,7 +38,6 @@ import static org.thingsboard.rule.engine.credentials.CertPemCredentials.PRIVATE
public class CertPemCredentialsTest {
private static final String PASS = "test";
private static final String EMPTY_PASS = "";
private static final String RSA = "RSA";
private static final String EC = "EC";
@ -81,10 +80,10 @@ public class CertPemCredentialsTest {
private static Stream<Arguments> testLoadKeyStore() {
return Stream.of(
Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", EMPTY_PASS, RSA),
Arguments.of("pem/rsa_cert.pem", "pem/rsa_key.pem", null, RSA),
Arguments.of("pem/rsa_encrypted_cert.pem", "pem/rsa_encrypted_key.pem", PASS, RSA),
Arguments.of("pem/rsa_encrypted_traditional_cert.pem", "pem/rsa_encrypted_traditional_key.pem", PASS, RSA),
Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", EMPTY_PASS, EC)
Arguments.of("pem/ec_cert.pem", "pem/ec_key.pem", null, EC)
);
}
@ -98,7 +97,7 @@ public class CertPemCredentialsTest {
certPemCredentials.setPassword(password);
KeyStore keyStore = certPemCredentials.loadKeyStore();
Assertions.assertNotNull(keyStore);
Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, password.toCharArray());
Key key = keyStore.getKey(PRIVATE_KEY_ALIAS, SslUtil.getPassword(password));
Assertions.assertNotNull(key);
Assertions.assertEquals(algorithm, key.getAlgorithm());

View File

@ -64,9 +64,13 @@ export interface AnalogueGaugeSettings {
animationRule: AnimationRule;
}
interface BaseGaugeModel extends BaseGauge {
_value?: number;
}
export abstract class TbBaseGauge<S, O extends GenericOptions> {
private gauge: BaseGauge;
private gauge: BaseGaugeModel;
protected constructor(protected ctx: WidgetContext, canvasId: string) {
const gaugeElement = $('#' + canvasId, ctx.$container)[0];
@ -77,16 +81,20 @@ export abstract class TbBaseGauge<S, O extends GenericOptions> {
protected abstract createGaugeOptions(gaugeElement: HTMLElement, settings: S): O;
protected abstract createGauge(gaugeData: O): BaseGauge;
protected abstract createGauge(gaugeData: O): BaseGaugeModel;
update() {
if (this.ctx.data.length > 0) {
const cellData = this.ctx.data[0];
if (cellData.data.length > 0) {
const tvPair = cellData.data[cellData.data.length -
1];
const tvPair = cellData.data[cellData.data.length - 1];
const value = parseFloat(tvPair[1]);
if (value !== this.gauge.value) {
if (!this.gauge.options.animation) {
this.gauge._value = value;
} else {
delete this.gauge._value;
}
this.gauge.value = value;
}
}

View File

@ -26,7 +26,6 @@ import { prepareFontSettings } from '@home/components/widget/lib/settings.models
import { CanvasDigitalGauge, CanvasDigitalGaugeOptions } from '@home/components/widget/lib/canvas-digital-gauge';
import { DatePipe } from '@angular/common';
import { IWidgetSubscription } from '@core/api/widget-api.models';
import { Subscription } from 'rxjs';
import { ColorProcessor, createValueSubscription, ValueSourceType } from '@shared/models/widget-settings.models';
import GenericOptions = CanvasGauges.GenericOptions;
@ -260,6 +259,8 @@ export class TbCanvasDigitalGauge {
if (value !== this.gauge.value) {
if (!this.gauge.options.animation) {
this.gauge._value = value;
} else {
delete this.gauge._value;
}
this.gauge.value = value;
} else if (this.localSettings.showTimestamp && this.gauge.timestamp !== timestamp) {