Merge remote-tracking branch 'origin/master' into develop/3.5
This commit is contained in:
commit
37c993b80d
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>thingsboard</artifactId>
|
||||
</parent>
|
||||
<artifactId>application</artifactId>
|
||||
|
||||
@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS audit_log (
|
||||
action_failure_details varchar(1000000)
|
||||
) PARTITION BY RANGE (created_time);
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_id ON audit_log(id);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
|
||||
LANGUAGE plpgsql AS
|
||||
@ -111,6 +112,7 @@ CREATE TABLE IF NOT EXISTS edge_event (
|
||||
ts bigint NOT NULL
|
||||
) PARTITION BY RANGE (created_time);
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_id ON edge_event(id);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
|
||||
LANGUAGE plpgsql AS
|
||||
|
||||
@ -661,10 +661,8 @@ public class TelemetryController extends BaseController {
|
||||
logAttributesDeleted(user, entityId, scope, keys, null);
|
||||
if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
|
||||
DeviceId deviceId = new DeviceId(entityId.getId());
|
||||
Set<AttributeKey> keysToNotify = new HashSet<>();
|
||||
keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key)));
|
||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
||||
user.getTenantId(), deviceId, keysToNotify), null);
|
||||
user.getTenantId(), deviceId, scope, keys), null);
|
||||
}
|
||||
result.setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||
}
|
||||
|
||||
@ -187,10 +187,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) {
|
||||
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
|
||||
syncCompleted = false;
|
||||
if (sessionState.getSendDownlinkMsgsFuture() != null && sessionState.getSendDownlinkMsgsFuture().isDone()) {
|
||||
String errorMsg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(new RuntimeException(errorMsg));
|
||||
}
|
||||
interruptGeneralProcessingOnSync(tenantId, edgeId);
|
||||
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
|
||||
}
|
||||
|
||||
@ -265,10 +262,7 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
if (sessionState.getPendingMsgsMap().isEmpty()) {
|
||||
log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
|
||||
if (sessionState.getScheduledSendDownlinkTask() != null) {
|
||||
sessionState.getScheduledSendDownlinkTask().cancel(false);
|
||||
}
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
|
||||
@ -391,15 +385,14 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
|
||||
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
|
||||
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
|
||||
String errorMsg = "[" + this.sessionId + "] Previous send downlink future was not properly completed, stopping it now";
|
||||
log.error(errorMsg);
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(new RuntimeException(errorMsg));
|
||||
}
|
||||
interruptPreviousSendDownlinkMsgsTask();
|
||||
|
||||
sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
|
||||
sessionState.getPendingMsgsMap().clear();
|
||||
|
||||
downlinkMsgsPack.forEach(msg -> sessionState.getPendingMsgsMap().put(msg.getDownlinkMsgId(), msg));
|
||||
scheduleDownlinkMsgsPackSend(1);
|
||||
|
||||
return sessionState.getSendDownlinkMsgsFuture();
|
||||
}
|
||||
|
||||
@ -422,13 +415,13 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
} else {
|
||||
log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
|
||||
this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
}
|
||||
} else {
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
stopCurrentSendDownlinkMsgsTask(null);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(e);
|
||||
stopCurrentSendDownlinkMsgsTask(e);
|
||||
}
|
||||
};
|
||||
|
||||
@ -673,4 +666,29 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void interruptPreviousSendDownlinkMsgsTask() {
|
||||
String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
|
||||
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
|
||||
}
|
||||
|
||||
private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) {
|
||||
String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
|
||||
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
|
||||
}
|
||||
|
||||
public void stopCurrentSendDownlinkMsgsTask(Exception e) {
|
||||
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
|
||||
if (e != null) {
|
||||
log.warn(e.getMessage(), e);
|
||||
sessionState.getSendDownlinkMsgsFuture().setException(e);
|
||||
} else {
|
||||
sessionState.getSendDownlinkMsgsFuture().set(null);
|
||||
}
|
||||
}
|
||||
if (sessionState.getScheduledSendDownlinkTask() != null) {
|
||||
sessionState.getScheduledSendDownlinkTask().cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
@ -26,7 +27,7 @@ import java.util.concurrent.ScheduledFuture;
|
||||
@Data
|
||||
public class EdgeSessionState {
|
||||
|
||||
private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>();
|
||||
private final Map<Integer, DownlinkMsg> pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>());
|
||||
private SettableFuture<Void> sendDownlinkMsgsFuture;
|
||||
private ScheduledFuture<?> scheduledSendDownlinkTask;
|
||||
}
|
||||
|
||||
@ -287,12 +287,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
|
||||
attributesService.removeAll(tenantId, entityId, scope, attributeNames);
|
||||
if (EntityType.DEVICE.name().equals(entityType)) {
|
||||
Set<AttributeKey> attributeKeys = new HashSet<>();
|
||||
for (String attributeName : attributeNames) {
|
||||
attributeKeys.add(new AttributeKey(scope, attributeName));
|
||||
}
|
||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
||||
tenantId, (DeviceId) entityId, attributeKeys), new TbQueueCallback() {
|
||||
tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
futureToSet.set(null);
|
||||
|
||||
@ -181,10 +181,11 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
|
||||
|
||||
private void update(TenantId tenantId, DeviceProfile deviceProfile, OtaPackageType otaPackageType) {
|
||||
Consumer<Device> updateConsumer;
|
||||
OtaPackageId packageId = OtaPackageUtil.getOtaPackageId(deviceProfile, otaPackageType);
|
||||
|
||||
if (deviceProfile.getFirmwareId() != null) {
|
||||
if (packageId != null) {
|
||||
long ts = System.currentTimeMillis();
|
||||
updateConsumer = d -> send(d.getTenantId(), d.getId(), deviceProfile.getFirmwareId(), ts, otaPackageType);
|
||||
updateConsumer = d -> send(d.getTenantId(), d.getId(), packageId, ts, otaPackageType);
|
||||
} else {
|
||||
updateConsumer = d -> remove(d, otaPackageType);
|
||||
}
|
||||
@ -360,9 +361,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Void tmp) {
|
||||
log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType);
|
||||
Set<AttributeKey> keysToNotify = new HashSet<>();
|
||||
attributesKeys.forEach(key -> keysToNotify.add(new AttributeKey(DataConstants.SHARED_SCOPE, key)));
|
||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), keysToNotify), null);
|
||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(device.getTenantId(), device.getId(), DataConstants.SHARED_SCOPE, attributesKeys), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -88,10 +88,8 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
|
||||
@Value("${cluster.stats.enabled:false}")
|
||||
private boolean statsEnabled;
|
||||
@Value("${edges.enabled}")
|
||||
@Value("${edges.enabled:true}")
|
||||
protected boolean edgesEnabled;
|
||||
@Value("${service.type:monolith}")
|
||||
private String serviceType;
|
||||
|
||||
private final AtomicInteger toCoreMsgs = new AtomicInteger(0);
|
||||
private final AtomicInteger toCoreNfs = new AtomicInteger(0);
|
||||
@ -584,25 +582,27 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
|
||||
private void doSendQueueNotifications(ToRuleEngineNotificationMsg ruleEngineMsg, ToCoreNotificationMsg coreMsg, ToTransportMsg transportMsg) {
|
||||
Set<TransportProtos.ServiceInfo> tbRuleEngineServices = partitionService.getAllServices(ServiceType.TB_RULE_ENGINE);
|
||||
for (TransportProtos.ServiceInfo ruleEngineService : tbRuleEngineServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineService.getServiceId());
|
||||
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
|
||||
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
|
||||
Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT);
|
||||
// No need to push notifications twice
|
||||
tbTransportServices.removeAll(tbCoreServices);
|
||||
tbCoreServices.removeAll(tbRuleEngineServices);
|
||||
|
||||
for (String ruleEngineServiceId : tbRuleEngineServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngineServiceId);
|
||||
producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), ruleEngineMsg), null);
|
||||
toRuleEngineNfs.incrementAndGet();
|
||||
}
|
||||
if (!serviceType.equals("monolith")) {
|
||||
Set<TransportProtos.ServiceInfo> tbCoreServices = partitionService.getAllServices(ServiceType.TB_CORE);
|
||||
for (TransportProtos.ServiceInfo coreService : tbCoreServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreService.getServiceId());
|
||||
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null);
|
||||
toCoreNfs.incrementAndGet();
|
||||
}
|
||||
Set<TransportProtos.ServiceInfo> tbTransportServices = partitionService.getAllServices(ServiceType.TB_TRANSPORT);
|
||||
for (TransportProtos.ServiceInfo transportService : tbTransportServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportService.getServiceId());
|
||||
producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
|
||||
toTransportNfs.incrementAndGet();
|
||||
}
|
||||
for (String coreServiceId : tbCoreServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, coreServiceId);
|
||||
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), coreMsg), null);
|
||||
toCoreNfs.incrementAndGet();
|
||||
}
|
||||
for (String transportServiceId : tbTransportServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
|
||||
producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
|
||||
toTransportNfs.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -490,7 +490,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
subscriptionManagerService.onAttributesDelete(
|
||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
||||
proto.getScope(), proto.getKeysList(), callback);
|
||||
proto.getScope(), proto.getKeysList(), proto.getNotifyDevice(), callback);
|
||||
} else if (msg.hasTsDelete()) {
|
||||
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
|
||||
subscriptionManagerService.onTimeSeriesDelete(
|
||||
|
||||
@ -64,7 +64,7 @@ public class RuleNodeTbelScriptEngine extends RuleNodeScriptEngine<TbelInvokeSer
|
||||
List<TbMsg> res = new ArrayList<>();
|
||||
for (Object resObject : (Collection) result) {
|
||||
if (resObject instanceof Map) {
|
||||
res.add(unbindMsg((Map) result, msg));
|
||||
res.add(unbindMsg((Map) resObject, msg));
|
||||
} else {
|
||||
return wrongResultType(resObject);
|
||||
}
|
||||
|
||||
@ -326,7 +326,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback callback) {
|
||||
public void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback callback) {
|
||||
onLocalTelemetrySubUpdate(entityId,
|
||||
s -> {
|
||||
if (TbSubscriptionType.ATTRIBUTES.equals(s.getType())) {
|
||||
@ -349,7 +349,13 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
return subscriptionUpdate;
|
||||
}, false);
|
||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
||||
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)
|
||||
|| TbAttributeSubscriptionScope.ANY_SCOPE.name().equalsIgnoreCase(scope)) {
|
||||
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
||||
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId,
|
||||
new DeviceId(entityId.getId()), scope, keys), null);
|
||||
}
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -38,7 +38,7 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
|
||||
|
||||
void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, TbCallback callback);
|
||||
|
||||
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
|
||||
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, TbCallback empty);
|
||||
|
||||
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
||||
|
||||
|
||||
@ -236,7 +236,7 @@ public class TbSubscriptionUtils {
|
||||
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
|
||||
}
|
||||
|
||||
public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
|
||||
public static ToCoreMsg toAttributesDeleteProto(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice) {
|
||||
TbAttributeDeleteProto.Builder builder = TbAttributeDeleteProto.newBuilder();
|
||||
builder.setEntityType(entityId.getEntityType().name());
|
||||
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||
@ -245,6 +245,7 @@ public class TbSubscriptionUtils {
|
||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
builder.setScope(scope);
|
||||
builder.addAllKeys(keys);
|
||||
builder.setNotifyDevice(notifyDevice);
|
||||
|
||||
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
|
||||
msgBuilder.setAttrDelete(builder);
|
||||
|
||||
@ -271,14 +271,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
@Override
|
||||
public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
|
||||
checkInternalEntity(entityId);
|
||||
deleteAndNotifyInternal(tenantId, entityId, scope, keys, callback);
|
||||
deleteAndNotifyInternal(tenantId, entityId, scope, keys, false, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
|
||||
public void deleteAndNotify(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
|
||||
checkInternalEntity(entityId);
|
||||
deleteAndNotifyInternal(tenantId, entityId, scope, keys, notifyDevice, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
|
||||
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
|
||||
addVoidCallback(deleteFuture, callback);
|
||||
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys));
|
||||
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -382,16 +388,16 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
}
|
||||
}
|
||||
|
||||
private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys) {
|
||||
private void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
||||
if (currentPartitions.contains(tpi)) {
|
||||
if (subscriptionManagerService.isPresent()) {
|
||||
subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, TbCallback.EMPTY);
|
||||
subscriptionManagerService.get().onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice, TbCallback.EMPTY);
|
||||
} else {
|
||||
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
||||
}
|
||||
} else {
|
||||
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys);
|
||||
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesDeleteProto(tenantId, entityId, scope, keys, notifyDevice);
|
||||
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
|
||||
|
||||
void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
|
||||
|
||||
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
|
||||
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
|
||||
|
||||
void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
|
||||
|
||||
|
||||
@ -930,7 +930,7 @@ edges:
|
||||
storage:
|
||||
max_read_records_count: "${EDGES_STORAGE_MAX_READ_RECORDS_COUNT:50}"
|
||||
no_read_records_sleep: "${EDGES_NO_READ_RECORDS_SLEEP:1000}"
|
||||
sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:1000}"
|
||||
sleep_between_batches: "${EDGES_SLEEP_BETWEEN_BATCHES:10000}"
|
||||
scheduler_pool_size: "${EDGES_SCHEDULER_POOL_SIZE:1}"
|
||||
send_scheduler_pool_size: "${EDGES_SEND_SCHEDULER_POOL_SIZE:1}"
|
||||
grpc_callback_thread_pool_size: "${EDGES_GRPC_CALLBACK_POOL_SIZE:1}"
|
||||
|
||||
@ -88,6 +88,7 @@ import org.thingsboard.server.service.security.auth.jwt.RefreshTokenRequest;
|
||||
import org.thingsboard.server.service.security.auth.rest.LoginRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -768,4 +769,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
throw new AssertionError("Unexpected status " + mvcResult.getResponse().getStatus());
|
||||
}
|
||||
|
||||
protected <T> T getFieldValue(Object target, String fieldName) throws Exception {
|
||||
Field field = target.getClass().getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
return (T) field.get(target);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -44,6 +44,7 @@ import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
@ -202,6 +203,21 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenSavingAuditLogAndPartitionSaveErrorOccurred_thenSaveAuditLogAnyway() throws Exception {
|
||||
// creating partition bigger than sql.audit_logs.partition_size
|
||||
partitioningRepository.createPartitionIfNotExists("audit_log", System.currentTimeMillis(), TimeUnit.DAYS.toMillis(7));
|
||||
List<Long> partitions = partitioningRepository.fetchPartitions("audit_log");
|
||||
assertThat(partitions).size().isOne();
|
||||
partitioningRepository.cleanupPartitionsCache("audit_log", System.currentTimeMillis(), 0);
|
||||
|
||||
assertDoesNotThrow(() -> {
|
||||
// expecting partition overlap error on partition save
|
||||
createAuditLog(ActionType.LOGIN, tenantAdminUserId);
|
||||
});
|
||||
assertThat(partitioningRepository.fetchPartitions("audit_log")).isEqualTo(partitions);
|
||||
}
|
||||
|
||||
private AuditLog createAuditLog(ActionType actionType, EntityId entityId) {
|
||||
AuditLog auditLog = new AuditLog();
|
||||
auditLog.setTenantId(tenantId);
|
||||
|
||||
@ -439,9 +439,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
||||
Assert.assertTrue(edgeImitator.waitForResponses());
|
||||
Assert.assertTrue(edgeImitator.waitForMessages());
|
||||
|
||||
AbstractMessage latestMessage = edgeImitator.getMessageFromTail(2);
|
||||
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
|
||||
DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
|
||||
Optional<DeviceUpdateMsg> deviceUpdateMsgOpt = edgeImitator.findMessageByType(DeviceUpdateMsg.class);
|
||||
Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
|
||||
DeviceUpdateMsg latestDeviceUpdateMsg = deviceUpdateMsgOpt.get();
|
||||
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
|
||||
Assert.assertEquals(deviceOnCloudName, latestDeviceUpdateMsg.getConflictName());
|
||||
|
||||
@ -453,9 +453,9 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
||||
Assert.assertNotNull(device);
|
||||
Assert.assertNotEquals(deviceOnCloudName, device.getName());
|
||||
|
||||
latestMessage = edgeImitator.getLatestMessage();
|
||||
Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg);
|
||||
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage;
|
||||
Optional<DeviceCredentialsRequestMsg> deviceCredentialsUpdateMsgOpt = edgeImitator.findMessageByType(DeviceCredentialsRequestMsg.class);
|
||||
Assert.assertTrue(deviceCredentialsUpdateMsgOpt.isPresent());
|
||||
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = deviceCredentialsUpdateMsgOpt.get();
|
||||
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
|
||||
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
|
||||
|
||||
|
||||
@ -34,7 +34,7 @@ abstract public class BaseTelemetryEdgeTest extends AbstractEdgeTest {
|
||||
|
||||
@Test
|
||||
public void testTimeseriesWithFailures() throws Exception {
|
||||
int numberOfTimeseriesToSend = 1000;
|
||||
int numberOfTimeseriesToSend = 333;
|
||||
|
||||
Device device = findDeviceByName("Edge Device 1");
|
||||
|
||||
|
||||
@ -365,11 +365,7 @@ public class EdgeImitator {
|
||||
}
|
||||
|
||||
public AbstractMessage getLatestMessage() {
|
||||
return getMessageFromTail(1);
|
||||
}
|
||||
|
||||
public AbstractMessage getMessageFromTail(int offset) {
|
||||
return downlinkMsgs.get(downlinkMsgs.size() - offset);
|
||||
return downlinkMsgs.get(downlinkMsgs.size() - 1);
|
||||
}
|
||||
|
||||
public void ignoreType(Class<? extends AbstractMessage> type) {
|
||||
|
||||
@ -0,0 +1,247 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.queue;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.boot.test.mock.mockito.SpyBean;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.id.QueueId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isNull;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Slf4j
|
||||
@RunWith(SpringRunner.class)
|
||||
@ContextConfiguration(classes = DefaultTbClusterService.class)
|
||||
public class DefaultTbClusterServiceTest {
|
||||
|
||||
public static final String MONOLITH = "monolith";
|
||||
|
||||
public static final String CORE = "core";
|
||||
|
||||
public static final String RULE_ENGINE = "rule_engine";
|
||||
|
||||
public static final String TRANSPORT = "transport";
|
||||
|
||||
@MockBean
|
||||
protected DataDecodingEncodingService encodingService;
|
||||
@MockBean
|
||||
protected TbDeviceProfileCache deviceProfileCache;
|
||||
@MockBean
|
||||
protected TbAssetProfileCache assetProfileCache;
|
||||
@MockBean
|
||||
protected GatewayNotificationsService gatewayNotificationsService;
|
||||
@MockBean
|
||||
protected PartitionService partitionService;
|
||||
@MockBean
|
||||
protected TbQueueProducerProvider producerProvider;
|
||||
|
||||
@SpyBean
|
||||
protected NotificationsTopicService notificationsTopicService;
|
||||
@SpyBean
|
||||
protected TbClusterService clusterService;
|
||||
|
||||
@Test
|
||||
public void testOnQueueChangeSingleMonolith() {
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(MONOLITH));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(MONOLITH));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(MONOLITH));
|
||||
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> tbQueueProducer = mock(TbQueueProducer.class);
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any());
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_TRANSPORT), any());
|
||||
|
||||
verify(tbQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(producerProvider, never()).getTbCoreNotificationsMsgProducer();
|
||||
verify(producerProvider, never()).getTransportNotificationsMsgProducer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnQueueChangeMultipleMonoliths() {
|
||||
String monolith1 = MONOLITH + 1;
|
||||
String monolith2 = MONOLITH + 2;
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(monolith1, monolith2));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(monolith1, monolith2));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(monolith1, monolith2));
|
||||
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> tbQueueProducer = mock(TbQueueProducer.class);
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any());
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_TRANSPORT), any());
|
||||
|
||||
verify(tbQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(producerProvider, never()).getTbCoreNotificationsMsgProducer();
|
||||
verify(producerProvider, never()).getTransportNotificationsMsgProducer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnQueueChangeSingleMonolithAndSingleRemoteTransport() {
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(MONOLITH));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(MONOLITH));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(MONOLITH, TRANSPORT));
|
||||
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> tbTransportQueueProducer = mock(TbQueueProducer.class);
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer);
|
||||
when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any());
|
||||
|
||||
verify(tbREQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(tbTransportQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbTransportQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, MONOLITH)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(tbTransportQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, MONOLITH)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(producerProvider, never()).getTbCoreNotificationsMsgProducer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnQueueChangeMultipleMicroservices() {
|
||||
String monolith1 = MONOLITH + 1;
|
||||
String monolith2 = MONOLITH + 2;
|
||||
|
||||
String core1 = CORE + 1;
|
||||
String core2 = CORE + 2;
|
||||
|
||||
String ruleEngine1 = RULE_ENGINE + 1;
|
||||
String ruleEngine2 = RULE_ENGINE + 2;
|
||||
|
||||
String transport1 = TRANSPORT + 1;
|
||||
String transport2 = TRANSPORT + 2;
|
||||
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)).thenReturn(Sets.newHashSet(monolith1, monolith2, ruleEngine1, ruleEngine2));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_CORE)).thenReturn(Sets.newHashSet(monolith1, monolith2, core1, core2));
|
||||
when(partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT)).thenReturn(Sets.newHashSet(monolith1, monolith2, transport1, transport2));
|
||||
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> tbREQueueProducer = mock(TbQueueProducer.class);
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> tbCoreQueueProducer = mock(TbQueueProducer.class);
|
||||
TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> tbTransportQueueProducer = mock(TbQueueProducer.class);
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer);
|
||||
when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer);
|
||||
when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine1);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine2);
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_CORE, core1);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_CORE, core2);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_CORE, monolith1);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_CORE, monolith2);
|
||||
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, transport1);
|
||||
verify(notificationsTopicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, transport2);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith1);
|
||||
verify(notificationsTopicService, never()).getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2);
|
||||
|
||||
verify(tbREQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbREQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbREQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbREQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, ruleEngine2)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(tbCoreQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, core1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbCoreQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, core2)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbCoreQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, monolith1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbCoreQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, monolith2)), any(TbProtoQueueMsg.class), isNull());
|
||||
|
||||
verify(tbTransportQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transport1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbTransportQueueProducer, times(1))
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transport2)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbTransportQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith1)), any(TbProtoQueueMsg.class), isNull());
|
||||
verify(tbTransportQueueProducer, never())
|
||||
.send(eq(notificationsTopicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, monolith2)), any(TbProtoQueueMsg.class), isNull());
|
||||
}
|
||||
|
||||
protected Queue createTestQueue() {
|
||||
TenantId tenantId = TenantId.SYS_TENANT_ID;
|
||||
Queue queue = new Queue(new QueueId(UUID.randomUUID()));
|
||||
queue.setTenantId(tenantId);
|
||||
queue.setName("Main");
|
||||
queue.setTopic("main");
|
||||
queue.setPartitions(10);
|
||||
return queue;
|
||||
}
|
||||
}
|
||||
@ -70,7 +70,7 @@ class NashornJsInvokeServiceTest extends AbstractControllerTest {
|
||||
}
|
||||
long duration = System.currentTimeMillis() - startTs;
|
||||
System.out.println(iterations + " invocations took: " + duration + "ms");
|
||||
Assert.assertTrue(duration < TimeUnit.MINUTES.toMillis(3));
|
||||
Assert.assertTrue(duration < TimeUnit.MINUTES.toMillis(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -31,7 +31,6 @@ import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -217,10 +216,4 @@ class TbelInvokeServiceTest extends AbstractControllerTest {
|
||||
return invokeService.invokeScript(TenantId.SYS_TENANT_ID, null, scriptId, msg, "{}", "POST_TELEMETRY_REQUEST").get().toString();
|
||||
}
|
||||
|
||||
private <T> T getFieldValue(Object target, String fieldName) throws Exception {
|
||||
Field field = target.getClass().getDeclaredField(fieldName);
|
||||
field.setAccessible(true);
|
||||
return (T) field.get(target);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
2
common/cache/pom.xml
vendored
2
common/cache/pom.xml
vendored
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -600,6 +600,7 @@ message TbAttributeDeleteProto {
|
||||
int64 tenantIdLSB = 5;
|
||||
string scope = 6;
|
||||
repeated string keys = 7;
|
||||
bool notifyDevice = 8;
|
||||
}
|
||||
|
||||
message TbTimeSeriesDeleteProto {
|
||||
|
||||
@ -22,7 +22,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -28,6 +28,7 @@ public class DataConstants {
|
||||
public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
|
||||
public static final String SERVER_SCOPE = "SERVER_SCOPE";
|
||||
public static final String SHARED_SCOPE = "SHARED_SCOPE";
|
||||
public static final String NOTIFY_DEVICE_METADATA_KEY = "notifyDevice";
|
||||
public static final String LATEST_TS = "LATEST_TS";
|
||||
public static final String IS_NEW_ALARM = "isNewAlarm";
|
||||
public static final String IS_EXISTING_ALARM = "isExistingAlarm";
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>thingsboard</artifactId>
|
||||
</parent>
|
||||
<artifactId>common</artifactId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>script</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.script</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>script</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.script</groupId>
|
||||
|
||||
@ -27,8 +27,8 @@ import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.mvel2.ExecutionContext;
|
||||
import org.mvel2.MVEL;
|
||||
import org.mvel2.ParserContext;
|
||||
import org.mvel2.SandboxedParserConfiguration;
|
||||
import org.mvel2.SandboxedParserContext;
|
||||
import org.mvel2.ScriptMemoryOverflowException;
|
||||
import org.mvel2.optimizers.OptimizerFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@ -47,15 +47,16 @@ import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@Slf4j
|
||||
@ConditionalOnProperty(prefix = "tbel", value = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
@ -68,8 +69,6 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
|
||||
|
||||
private SandboxedParserConfiguration parserConfig;
|
||||
|
||||
private static final Pattern NEW_KEYWORD_PATTERN = Pattern.compile("new\\s");
|
||||
|
||||
@Getter
|
||||
@Value("${tbel.max_total_args_size:100000}")
|
||||
private long maxTotalArgsSize;
|
||||
@ -123,9 +122,11 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
|
||||
public void init() {
|
||||
super.init();
|
||||
OptimizerFactory.setDefaultOptimizer(OptimizerFactory.SAFE_REFLECTIVE);
|
||||
parserConfig = new SandboxedParserConfiguration();
|
||||
parserConfig = ParserContext.enableSandboxedMode();
|
||||
parserConfig.addImport("JSON", TbJson.class);
|
||||
parserConfig.registerDataType("Date", TbDate.class, date -> 8L);
|
||||
parserConfig.registerDataType("Random", Random.class, date -> 8L);
|
||||
parserConfig.registerDataType("Calendar", Calendar.class, date -> 8L);
|
||||
TbUtils.register(parserConfig);
|
||||
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(threadPoolSize, "tbel-executor"));
|
||||
try {
|
||||
@ -219,7 +220,7 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
|
||||
}
|
||||
|
||||
private Serializable compileScript(String scriptBody) {
|
||||
return MVEL.compileExpression(scriptBody, new SandboxedParserContext(parserConfig));
|
||||
return MVEL.compileExpression(scriptBody, new ParserContext());
|
||||
}
|
||||
|
||||
@SuppressWarnings("UnstableApiUsage")
|
||||
|
||||
@ -17,6 +17,14 @@ package org.thingsboard.script.api.tbel;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.TemporalAccessor;
|
||||
import java.util.Date;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.Locale;
|
||||
@ -24,6 +32,11 @@ import java.util.TimeZone;
|
||||
|
||||
public class TbDate extends Date {
|
||||
|
||||
private static final DateTimeFormatter isoDateFormatter = DateTimeFormatter.ofPattern(
|
||||
"yyyy-MM-dd[[ ]['T']HH:mm[:ss[.SSS]][ ][XXX][Z][z][VV][O]]").withZone(ZoneId.systemDefault());
|
||||
|
||||
private static final DateFormat isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
|
||||
|
||||
public TbDate() {
|
||||
super();
|
||||
}
|
||||
@ -60,8 +73,7 @@ public class TbDate extends Date {
|
||||
}
|
||||
|
||||
public String toISOString() {
|
||||
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
|
||||
return formatter.format(this);
|
||||
return isoDateFormat.format(this);
|
||||
}
|
||||
|
||||
public String toLocaleString(String locale) {
|
||||
@ -79,11 +91,29 @@ public class TbDate extends Date {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public static long parse(String value, String format) {
|
||||
try {
|
||||
DateFormat dateFormat = new SimpleDateFormat(format);
|
||||
return dateFormat.parse(value).getTime();
|
||||
} catch (Exception e) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
public static long parse(String value) {
|
||||
try {
|
||||
return Date.parse(value);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return -1;
|
||||
TemporalAccessor accessor = isoDateFormatter.parseBest(value,
|
||||
ZonedDateTime::from,
|
||||
LocalDateTime::from,
|
||||
LocalDate::from);
|
||||
Instant instant = Instant.from(accessor);
|
||||
return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
return Date.parse(value);
|
||||
} catch (IllegalArgumentException e2) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,6 +24,8 @@ import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
@ -65,12 +67,24 @@ public class TbUtils {
|
||||
String.class)));
|
||||
parserConfig.addImport("parseHexToInt", new MethodStub(TbUtils.class.getMethod("parseHexToInt",
|
||||
String.class, boolean.class)));
|
||||
parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt",
|
||||
List.class, int.class, int.class)));
|
||||
parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt",
|
||||
List.class, int.class, int.class, boolean.class)));
|
||||
parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt",
|
||||
byte[].class, int.class, int.class)));
|
||||
parserConfig.addImport("parseBytesToInt", new MethodStub(TbUtils.class.getMethod("parseBytesToInt",
|
||||
byte[].class, int.class, int.class, boolean.class)));
|
||||
parserConfig.addImport("toFixed", new MethodStub(TbUtils.class.getMethod("toFixed",
|
||||
double.class, int.class)));
|
||||
parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes",
|
||||
ExecutionContext.class, String.class)));
|
||||
parserConfig.addImport("base64ToHex", new MethodStub(TbUtils.class.getMethod("base64ToHex",
|
||||
String.class)));
|
||||
parserConfig.addImport("base64ToBytes", new MethodStub(TbUtils.class.getMethod("base64ToBytes",
|
||||
String.class)));
|
||||
parserConfig.addImport("bytesToBase64", new MethodStub(TbUtils.class.getMethod("bytesToBase64",
|
||||
byte[].class)));
|
||||
parserConfig.addImport("bytesToHex", new MethodStub(TbUtils.class.getMethod("bytesToHex",
|
||||
byte[].class)));
|
||||
parserConfig.addImport("bytesToHex", new MethodStub(TbUtils.class.getMethod("bytesToHex",
|
||||
@ -119,8 +133,8 @@ public class TbUtils {
|
||||
|
||||
private static List<Byte> bytesToList(ExecutionContext ctx, byte[] bytes) {
|
||||
List<Byte> list = new ExecutionArrayList<>(ctx);
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
list.add(bytes[i]);
|
||||
for (byte aByte : bytes) {
|
||||
list.add(aByte);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
@ -194,25 +208,25 @@ public class TbUtils {
|
||||
if (length > 8) {
|
||||
throw new IllegalArgumentException("Hex string is too large. Maximum 8 symbols allowed.");
|
||||
}
|
||||
if (bigEndian) {
|
||||
return Integer.parseInt(hex, 16);
|
||||
} else {
|
||||
if (length < 8) {
|
||||
hex = hex + "0".repeat(8 - length);
|
||||
}
|
||||
return Integer.reverseBytes(Integer.parseInt(hex, 16));
|
||||
if (length % 2 > 0) {
|
||||
throw new IllegalArgumentException("Hex string must be even-length.");
|
||||
}
|
||||
byte[] data = new byte[length / 2];
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) + Character.digit(hex.charAt(i + 1), 16));
|
||||
}
|
||||
return parseBytesToInt(data, 0, data.length, bigEndian);
|
||||
}
|
||||
|
||||
public static ExecutionArrayList<Integer> hexToBytes(ExecutionContext ctx, String hex) {
|
||||
public static ExecutionArrayList<Byte> hexToBytes(ExecutionContext ctx, String hex) {
|
||||
int len = hex.length();
|
||||
if (len % 2 > 0) {
|
||||
throw new IllegalArgumentException("Hex string must be even-length.");
|
||||
}
|
||||
ExecutionArrayList<Integer> data = new ExecutionArrayList<>(ctx);
|
||||
ExecutionArrayList<Byte> data = new ExecutionArrayList<>(ctx);
|
||||
for (int i = 0; i < len; i += 2) {
|
||||
data.add((Character.digit(hex.charAt(i), 16) << 4)
|
||||
+ Character.digit(hex.charAt(i+1), 16));
|
||||
data.add((byte)((Character.digit(hex.charAt(i), 16) << 4)
|
||||
+ Character.digit(hex.charAt(i + 1), 16)));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
@ -221,6 +235,50 @@ public class TbUtils {
|
||||
return bytesToHex(Base64.getDecoder().decode(base64));
|
||||
}
|
||||
|
||||
public static String bytesToBase64(byte[] bytes) {
|
||||
return Base64.getEncoder().encodeToString(bytes);
|
||||
}
|
||||
|
||||
public static byte[] base64ToBytes(String input) {
|
||||
return Base64.getDecoder().decode(input);
|
||||
}
|
||||
|
||||
public static int parseBytesToInt(List<Byte> data, int offset, int length) {
|
||||
return parseBytesToInt(data, offset, length, true);
|
||||
}
|
||||
|
||||
public static int parseBytesToInt(List<Byte> data, int offset, int length, boolean bigEndian) {
|
||||
final byte[] bytes = new byte[data.size()];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = data.get(i);
|
||||
}
|
||||
return parseBytesToInt(bytes, offset, length, bigEndian);
|
||||
}
|
||||
|
||||
public static int parseBytesToInt(byte[] data, int offset, int length) {
|
||||
return parseBytesToInt(data, offset, length, true);
|
||||
}
|
||||
|
||||
public static int parseBytesToInt(byte[] data, int offset, int length, boolean bigEndian) {
|
||||
if (offset > data.length) {
|
||||
throw new IllegalArgumentException("Offset: " + offset + " is out of bounds for array with length: " + data.length + "!");
|
||||
}
|
||||
if (length > 4) {
|
||||
throw new IllegalArgumentException("Length: " + length + " is too large. Maximum 4 bytes is allowed!");
|
||||
}
|
||||
if (offset + length > data.length) {
|
||||
throw new IllegalArgumentException("Offset: " + offset + " and Length: " + length + " is out of bounds for array with length: " + data.length + "!");
|
||||
}
|
||||
var bb = ByteBuffer.allocate(4);
|
||||
if (!bigEndian) {
|
||||
bb.order(ByteOrder.LITTLE_ENDIAN);
|
||||
}
|
||||
bb.position(bigEndian ? 4 - length : 0);
|
||||
bb.put(data, offset, length);
|
||||
bb.position(0);
|
||||
return bb.getInt();
|
||||
}
|
||||
|
||||
public static String bytesToHex(ExecutionArrayList<?> bytesList) {
|
||||
byte[] bytes = new byte[bytesList.size()];
|
||||
for (int i = 0; i < bytesList.size(); i++) {
|
||||
|
||||
@ -0,0 +1,98 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.script.api.tbel;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TbUtilsTest {
|
||||
|
||||
@Test
|
||||
public void parseHexToInt() {
|
||||
Assert.assertEquals(0xAB, TbUtils.parseHexToInt("AB"));
|
||||
Assert.assertEquals(0xABBA, TbUtils.parseHexToInt("ABBA", true));
|
||||
Assert.assertEquals(0xBAAB, TbUtils.parseHexToInt("ABBA", false));
|
||||
Assert.assertEquals(0xAABBCC, TbUtils.parseHexToInt("AABBCC", true));
|
||||
Assert.assertEquals(0xAABBCC, TbUtils.parseHexToInt("CCBBAA", false));
|
||||
Assert.assertEquals(0xAABBCCDD, TbUtils.parseHexToInt("AABBCCDD", true));
|
||||
Assert.assertEquals(0xAABBCCDD, TbUtils.parseHexToInt("DDCCBBAA", false));
|
||||
Assert.assertEquals(0xDDCCBBAA, TbUtils.parseHexToInt("DDCCBBAA", true));
|
||||
Assert.assertEquals(0xDDCCBBAA, TbUtils.parseHexToInt("AABBCCDD", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseBytesToInt_checkPrimitives() {
|
||||
int expected = 257;
|
||||
byte[] data = ByteBuffer.allocate(4).putInt(expected).array();
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4));
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 2, 2, true));
|
||||
Assert.assertEquals(1, TbUtils.parseBytesToInt(data, 3, 1, true));
|
||||
|
||||
expected = Integer.MAX_VALUE;
|
||||
data = ByteBuffer.allocate(4).putInt(expected).array();
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, true));
|
||||
|
||||
expected = 0xAABBCCDD;
|
||||
data = new byte[]{(byte) 0xAA, (byte) 0xBB, (byte) 0xCC, (byte) 0xDD};
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, true));
|
||||
data = new byte[]{(byte) 0xDD, (byte) 0xCC, (byte) 0xBB, (byte) 0xAA};
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, false));
|
||||
|
||||
expected = 0xAABBCC;
|
||||
data = new byte[]{(byte) 0xAA, (byte) 0xBB, (byte) 0xCC};
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 3, true));
|
||||
data = new byte[]{(byte) 0xCC, (byte) 0xBB, (byte) 0xAA};
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 3, false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parseBytesToInt_checkLists() {
|
||||
int expected = 257;
|
||||
List<Byte> data = toList(ByteBuffer.allocate(4).putInt(expected).array());
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4));
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 2, 2, true));
|
||||
Assert.assertEquals(1, TbUtils.parseBytesToInt(data, 3, 1, true));
|
||||
|
||||
expected = Integer.MAX_VALUE;
|
||||
data = toList(ByteBuffer.allocate(4).putInt(expected).array());
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, true));
|
||||
|
||||
expected = 0xAABBCCDD;
|
||||
data = toList(new byte[]{(byte) 0xAA, (byte) 0xBB, (byte) 0xCC, (byte) 0xDD});
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, true));
|
||||
data = toList(new byte[]{(byte) 0xDD, (byte) 0xCC, (byte) 0xBB, (byte) 0xAA});
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 4, false));
|
||||
|
||||
expected = 0xAABBCC;
|
||||
data = toList(new byte[]{(byte) 0xAA, (byte) 0xBB, (byte) 0xCC});
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 3, true));
|
||||
data = toList(new byte[]{(byte) 0xCC, (byte) 0xBB, (byte) 0xAA});
|
||||
Assert.assertEquals(expected, TbUtils.parseBytesToInt(data, 0, 3, false));
|
||||
}
|
||||
|
||||
private static List<Byte> toList(byte[] data) {
|
||||
List<Byte> result = new ArrayList<>(data.length);
|
||||
for (Byte b : data) {
|
||||
result.add(b);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
@ -22,7 +22,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common.transport</groupId>
|
||||
|
||||
@ -32,6 +32,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.ApiUsageState;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
@ -585,7 +586,7 @@ public class DefaultTransportService implements TransportService {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue("deviceName", sessionInfo.getDeviceName());
|
||||
metaData.putValue("deviceType", sessionInfo.getDeviceType());
|
||||
metaData.putValue("notifyDevice", "false");
|
||||
metaData.putValue(DataConstants.NOTIFY_DEVICE_METADATA_KEY, "false");
|
||||
CustomerId customerId = getCustomerId(sessionInfo);
|
||||
sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST,
|
||||
new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>common</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>thingsboard</artifactId>
|
||||
</parent>
|
||||
<artifactId>dao</artifactId>
|
||||
|
||||
@ -95,7 +95,7 @@ public class AssetProfileServiceImpl extends AbstractCachedEntityService<AssetPr
|
||||
log.trace("Executing findAssetProfileByName [{}][{}]", tenantId, profileName);
|
||||
Validator.validateString(profileName, INCORRECT_ASSET_PROFILE_NAME + profileName);
|
||||
return cache.getAndPutInTransaction(AssetProfileCacheKey.fromName(tenantId, profileName),
|
||||
() -> assetProfileDao.findByName(tenantId, profileName), true);
|
||||
() -> assetProfileDao.findByName(tenantId, profileName), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -16,16 +16,16 @@
|
||||
package org.thingsboard.server.dao.sqlts.insert.sql;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.dao.timeseries.SqlPartition;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -36,9 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
@Slf4j
|
||||
public class SqlPartitioningRepository {
|
||||
|
||||
@PersistenceContext
|
||||
private EntityManager entityManager;
|
||||
|
||||
@Autowired
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
@ -50,12 +47,12 @@ public class SqlPartitioningRepository {
|
||||
private final Map<String, Map<Long, SqlPartition>> tablesPartitions = new ConcurrentHashMap<>();
|
||||
private final ReentrantLock partitionCreationLock = new ReentrantLock();
|
||||
|
||||
@Transactional
|
||||
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||
public void save(SqlPartition partition) {
|
||||
entityManager.createNativeQuery(partition.getQuery()).executeUpdate();
|
||||
jdbcTemplate.execute(partition.getQuery());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Transactional(propagation = Propagation.NOT_SUPPORTED) // executing non-transactionally, so that parent transaction is not aborted on partition save error
|
||||
public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) {
|
||||
long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs);
|
||||
Map<Long, SqlPartition> partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
|
||||
@ -64,19 +61,17 @@ public class SqlPartitioningRepository {
|
||||
partitionCreationLock.lock();
|
||||
try {
|
||||
if (partitions.containsKey(partitionStartTs)) return;
|
||||
log.trace("Saving partition: {}", partition);
|
||||
log.info("Saving partition {}-{} for table {}", partition.getStart(), partition.getEnd(), table);
|
||||
save(partition);
|
||||
log.trace("Adding partition to map: {}", partition);
|
||||
partitions.put(partition.getStart(), partition);
|
||||
} catch (RuntimeException e) {
|
||||
log.trace("Error occurred during partition save:", e);
|
||||
String msg = ExceptionUtils.getRootCauseMessage(e);
|
||||
if (msg.contains("would overlap partition")) {
|
||||
log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}",
|
||||
partition.getPartitionDate(), table, msg);
|
||||
} catch (Exception e) {
|
||||
String error = ExceptionUtils.getRootCauseMessage(e);
|
||||
if (StringUtils.containsAny(error, "would overlap partition", "already exists")) {
|
||||
partitions.put(partition.getStart(), partition);
|
||||
log.debug("Couldn't save partition {}-{} for table {}: {}", partition.getStart(), partition.getEnd(), table, error);
|
||||
} else {
|
||||
throw e;
|
||||
log.warn("Couldn't save partition {}-{} for table {}: {}", partition.getStart(), partition.getEnd(), table, error);
|
||||
}
|
||||
} finally {
|
||||
partitionCreationLock.unlock();
|
||||
|
||||
@ -50,8 +50,12 @@ CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribu
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_audit_log_id ON audit_log(id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_edge_event_id ON edge_event(id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_device_external_id ON device(tenant_id, external_id);
|
||||
|
||||
@ -195,3 +195,35 @@ function checkFolders() {
|
||||
done < <(echo "$PERMISSION_LIST")
|
||||
return $EXIT_CODE
|
||||
}
|
||||
|
||||
function composeVersion() {
|
||||
#Checking whether "set -e" shell option should be restored after Compose version check
|
||||
FLAG_SET=false
|
||||
if [[ $SHELLOPTS =~ errexit ]]; then
|
||||
set +e
|
||||
FLAG_SET=true
|
||||
fi
|
||||
|
||||
#Checking Compose V1 availablity
|
||||
docker-compose version >/dev/null 2>&1
|
||||
if [ $? -eq 0 ]; then status_v1=true; else status_v1=false; fi
|
||||
|
||||
#Checking Compose V2 availablity
|
||||
docker compose version >/dev/null 2>&1
|
||||
if [ $? -eq 0 ]; then status_v2=true; else status_v2=false; fi
|
||||
|
||||
COMPOSE_VERSION=""
|
||||
|
||||
if $status_v2 ; then
|
||||
COMPOSE_VERSION="V2"
|
||||
elif $status_v1 ; then
|
||||
COMPOSE_VERSION="V1"
|
||||
else
|
||||
echo "Docker Compose plugin is not detected. Please check your environment." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo $COMPOSE_VERSION
|
||||
|
||||
if $FLAG_SET ; then set -e; fi
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-js-executor:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
cassandra:
|
||||
@ -23,5 +23,5 @@ services:
|
||||
|
||||
volumes:
|
||||
cassandra-volume:
|
||||
external: true
|
||||
name: ${CASSANDRA_DATA_VOLUME}
|
||||
external:
|
||||
name: ${CASSANDRA_DATA_VOLUME}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-js-executor:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
postgres:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
kafka:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
postgres:
|
||||
@ -23,5 +23,5 @@ services:
|
||||
|
||||
volumes:
|
||||
postgres-db-volume:
|
||||
external: true
|
||||
name: ${POSTGRES_DATA_VOLUME}
|
||||
external:
|
||||
name: ${POSTGRES_DATA_VOLUME}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
postgres:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
volumes:
|
||||
prometheus_data: {}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-js-executor:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-js-executor:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
# Redis cluster
|
||||
@ -39,20 +39,20 @@ services:
|
||||
|
||||
volumes:
|
||||
redis-cluster-data-0:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_0}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_0}
|
||||
redis-cluster-data-1:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_1}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_1}
|
||||
redis-cluster-data-2:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_2}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_2}
|
||||
redis-cluster-data-3:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_3}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_3}
|
||||
redis-cluster-data-4:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_4}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_4}
|
||||
redis-cluster-data-5:
|
||||
external: true
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_5}
|
||||
external:
|
||||
name: ${REDIS_CLUSTER_DATA_VOLUME_5}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
# Redis cluster
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
redis:
|
||||
@ -23,5 +23,5 @@ services:
|
||||
|
||||
volumes:
|
||||
redis-data:
|
||||
external: true
|
||||
name: ${REDIS_DATA_VOLUME}
|
||||
external:
|
||||
name: ${REDIS_DATA_VOLUME}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
# Redis standalone
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-js-executor:
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
tb-core1:
|
||||
@ -59,23 +59,23 @@ services:
|
||||
|
||||
volumes:
|
||||
tb-log-volume:
|
||||
external: true
|
||||
name: ${TB_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_LOG_VOLUME}
|
||||
tb-coap-transport-log-volume:
|
||||
external: true
|
||||
name: ${TB_COAP_TRANSPORT_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_COAP_TRANSPORT_LOG_VOLUME}
|
||||
tb-lwm2m-transport-log-volume:
|
||||
external: true
|
||||
name: ${TB_LWM2M_TRANSPORT_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_LWM2M_TRANSPORT_LOG_VOLUME}
|
||||
tb-http-transport-log-volume:
|
||||
external: true
|
||||
name: ${TB_HTTP_TRANSPORT_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_HTTP_TRANSPORT_LOG_VOLUME}
|
||||
tb-mqtt-transport-log-volume:
|
||||
external: true
|
||||
name: ${TB_MQTT_TRANSPORT_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_MQTT_TRANSPORT_LOG_VOLUME}
|
||||
tb-snmp-transport-log-volume:
|
||||
external: true
|
||||
name: ${TB_SNMP_TRANSPORT_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_SNMP_TRANSPORT_LOG_VOLUME}
|
||||
tb-vc-executor-log-volume:
|
||||
external: true
|
||||
name: ${TB_VC_EXECUTOR_LOG_VOLUME}
|
||||
external:
|
||||
name: ${TB_VC_EXECUTOR_LOG_VOLUME}
|
||||
|
||||
@ -15,7 +15,7 @@
|
||||
#
|
||||
|
||||
|
||||
version: '2.2'
|
||||
version: '3.0'
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
@ -30,7 +30,8 @@ services:
|
||||
tb-js-executor:
|
||||
restart: always
|
||||
image: "${DOCKER_REPO}/${JS_EXECUTOR_DOCKER_NAME}:${TB_VERSION}"
|
||||
scale: 10
|
||||
deploy:
|
||||
replicas: 10
|
||||
env_file:
|
||||
- tb-js-executor.env
|
||||
tb-core1:
|
||||
|
||||
@ -41,6 +41,8 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
@ -52,14 +54,39 @@ ADDITIONAL_STARTUP_SERVICES=$(additionalStartupServices) || exit $?
|
||||
checkFolders --create || exit $?
|
||||
|
||||
if [ ! -z "${ADDITIONAL_STARTUP_SERVICES// }" ]; then
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
up -d $ADDITIONAL_STARTUP_SERVICES
|
||||
|
||||
COMPOSE_ARGS="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
up -d ${ADDITIONAL_STARTUP_SERVICES}"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
fi
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
run --no-deps --rm -e INSTALL_TB=true -e LOAD_DEMO=${loadDemo} \
|
||||
tb-core1
|
||||
COMPOSE_ARGS="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
run --no-deps --rm -e INSTALL_TB=true -e LOAD_DEMO=${loadDemo} \
|
||||
tb-core1"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
|
||||
@ -19,6 +19,8 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
@ -27,6 +29,18 @@ ADDITIONAL_CACHE_ARGS=$(additionalComposeCacheArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_MONITORING_ARGS=$(additionalComposeMonitoringArgs) || exit $?
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS $ADDITIONAL_COMPOSE_MONITORING_ARGS \
|
||||
down -v
|
||||
COMPOSE_ARGS="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} ${ADDITIONAL_COMPOSE_MONITORING_ARGS} \
|
||||
down -v"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
@ -19,6 +19,8 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
@ -29,6 +31,18 @@ ADDITIONAL_COMPOSE_MONITORING_ARGS=$(additionalComposeMonitoringArgs) || exit $?
|
||||
|
||||
checkFolders --create || exit $?
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS $ADDITIONAL_COMPOSE_MONITORING_ARGS \
|
||||
up -d
|
||||
COMPOSE_ARGS="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} ${ADDITIONAL_COMPOSE_MONITORING_ARGS} \
|
||||
up -d"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS
|
||||
;;
|
||||
V1)
|
||||
docker-compose --compatibility $COMPOSE_ARGS
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
@ -19,6 +19,8 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
@ -27,6 +29,18 @@ ADDITIONAL_CACHE_ARGS=$(additionalComposeCacheArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_MONITORING_ARGS=$(additionalComposeMonitoringArgs) || exit $?
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS $ADDITIONAL_COMPOSE_MONITORING_ARGS \
|
||||
stop
|
||||
COMPOSE_ARGS="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} ${ADDITIONAL_COMPOSE_MONITORING_ARGS} \
|
||||
stop"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
@ -19,15 +19,32 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
|
||||
ADDITIONAL_CACHE_ARGS=$(additionalComposeCacheArgs) || exit $?
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
pull $@
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
up -d --no-deps --build $@
|
||||
COMPOSE_ARGS_PULL="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
pull"
|
||||
|
||||
COMPOSE_ARGS_BUILD="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
up -d --no-deps --build"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS_PULL $@
|
||||
docker compose $COMPOSE_ARGS_BUILD $@
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS_PULL $@
|
||||
docker-compose $COMPOSE_ARGS_BUILD $@
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
@ -40,6 +40,8 @@ set -e
|
||||
|
||||
source compose-utils.sh
|
||||
|
||||
COMPOSE_VERSION=$(composeVersion) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_QUEUE_ARGS=$(additionalComposeQueueArgs) || exit $?
|
||||
|
||||
ADDITIONAL_COMPOSE_ARGS=$(additionalComposeArgs) || exit $?
|
||||
@ -50,16 +52,32 @@ ADDITIONAL_STARTUP_SERVICES=$(additionalStartupServices) || exit $?
|
||||
|
||||
checkFolders --create || exit $?
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
pull \
|
||||
tb-core1
|
||||
COMPOSE_ARGS_PULL="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
pull \
|
||||
tb-core1"
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
up -d $ADDITIONAL_STARTUP_SERVICES
|
||||
COMPOSE_ARGS_UP="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
up -d ${ADDITIONAL_STARTUP_SERVICES}"
|
||||
|
||||
docker-compose \
|
||||
-f docker-compose.yml $ADDITIONAL_CACHE_ARGS $ADDITIONAL_COMPOSE_ARGS $ADDITIONAL_COMPOSE_QUEUE_ARGS \
|
||||
run --no-deps --rm -e UPGRADE_TB=true -e FROM_VERSION=${fromVersion} \
|
||||
tb-core1
|
||||
COMPOSE_ARGS_RUN="\
|
||||
-f docker-compose.yml ${ADDITIONAL_CACHE_ARGS} ${ADDITIONAL_COMPOSE_ARGS} ${ADDITIONAL_COMPOSE_QUEUE_ARGS} \
|
||||
run --no-deps --rm -e UPGRADE_TB=true -e FROM_VERSION=${fromVersion} \
|
||||
tb-core1"
|
||||
|
||||
case $COMPOSE_VERSION in
|
||||
V2)
|
||||
docker compose $COMPOSE_ARGS_PULL
|
||||
docker compose $COMPOSE_ARGS_UP
|
||||
docker compose $COMPOSE_ARGS_RUN
|
||||
;;
|
||||
V1)
|
||||
docker-compose $COMPOSE_ARGS_PULL
|
||||
docker-compose $COMPOSE_ARGS_UP
|
||||
docker-compose $COMPOSE_ARGS_RUN
|
||||
;;
|
||||
*)
|
||||
# unknown option
|
||||
;;
|
||||
esac
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>msa</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
@ -92,6 +92,10 @@
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.californium</groupId>
|
||||
<artifactId>californium-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
@ -160,6 +164,12 @@
|
||||
<artifactId>snmp</artifactId>
|
||||
<type>docker-info</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.thingsboard.common</groupId>
|
||||
<artifactId>message</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
@ -26,7 +26,14 @@ import org.apache.http.ssl.SSLContexts;
|
||||
import org.testng.annotations.AfterSuite;
|
||||
import org.testng.annotations.BeforeSuite;
|
||||
import org.testng.annotations.Listeners;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.device.profile.AllowCreateNewDevicesDeviceProfileProvisionConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.CheckPreProvisionedDevicesDeviceProfileProvisionConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileProvisionConfiguration;
|
||||
import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
|
||||
import java.net.URI;
|
||||
@ -37,6 +44,9 @@ import java.util.Random;
|
||||
@Slf4j
|
||||
@Listeners(TestListener.class)
|
||||
public abstract class AbstractContainerTest {
|
||||
|
||||
protected final static String TEST_PROVISION_DEVICE_KEY = "test_provision_key";
|
||||
protected final static String TEST_PROVISION_DEVICE_SECRET = "test_provision_secret";
|
||||
protected static long timeoutMultiplier = 1;
|
||||
protected ObjectMapper mapper = new ObjectMapper();
|
||||
private static final ContainerTestSuite containerTestSuite = ContainerTestSuite.getInstance();
|
||||
@ -150,4 +160,28 @@ public abstract class AbstractContainerTest {
|
||||
}
|
||||
}
|
||||
|
||||
protected DeviceProfile updateDeviceProfileWithProvisioningStrategy(DeviceProfile deviceProfile, DeviceProfileProvisionType provisionType) {
|
||||
DeviceProfileProvisionConfiguration provisionConfiguration;
|
||||
String testProvisionDeviceKey = TEST_PROVISION_DEVICE_KEY;
|
||||
deviceProfile.setProvisionType(provisionType);
|
||||
switch(provisionType) {
|
||||
case ALLOW_CREATE_NEW_DEVICES:
|
||||
provisionConfiguration = new AllowCreateNewDevicesDeviceProfileProvisionConfiguration(TEST_PROVISION_DEVICE_SECRET);
|
||||
break;
|
||||
case CHECK_PRE_PROVISIONED_DEVICES:
|
||||
provisionConfiguration = new CheckPreProvisionedDevicesDeviceProfileProvisionConfiguration(TEST_PROVISION_DEVICE_SECRET);
|
||||
break;
|
||||
default:
|
||||
case DISABLED:
|
||||
testProvisionDeviceKey = null;
|
||||
provisionConfiguration = new DisabledDeviceProfileProvisionConfiguration(null);
|
||||
break;
|
||||
}
|
||||
DeviceProfileData deviceProfileData = deviceProfile.getProfileData();
|
||||
deviceProfileData.setProvisionConfiguration(provisionConfiguration);
|
||||
deviceProfile.setProfileData(deviceProfileData);
|
||||
deviceProfile.setProvisionDeviceKey(testProvisionDeviceKey);
|
||||
return testRestClient.postDeviceProfile(deviceProfile);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,121 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.msa;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.eclipse.californium.core.CoapClient;
|
||||
import org.eclipse.californium.core.CoapHandler;
|
||||
import org.eclipse.californium.core.CoapObserveRelation;
|
||||
import org.eclipse.californium.core.CoapResponse;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
import org.eclipse.californium.core.coap.MediaTypeRegistry;
|
||||
import org.eclipse.californium.core.coap.Request;
|
||||
import org.eclipse.californium.elements.exception.ConnectorException;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
|
||||
public class TestCoapClient {
|
||||
|
||||
private static final String COAP_BASE_URL = "coap://localhost:5683/api/v1/";
|
||||
private static final long CLIENT_REQUEST_TIMEOUT = 60000L;
|
||||
|
||||
private final CoapClient client;
|
||||
|
||||
public TestCoapClient(){
|
||||
this.client = createClient();
|
||||
}
|
||||
|
||||
public TestCoapClient(String accessToken, FeatureType featureType) {
|
||||
this.client = createClient(getFeatureTokenUrl(accessToken, featureType));
|
||||
}
|
||||
|
||||
public TestCoapClient(String featureTokenUrl) {
|
||||
this.client = createClient(featureTokenUrl);
|
||||
}
|
||||
|
||||
public void connectToCoap(String accessToken) {
|
||||
setURI(accessToken, null);
|
||||
}
|
||||
|
||||
public void connectToCoap(String accessToken, FeatureType featureType) {
|
||||
setURI(accessToken, featureType);
|
||||
}
|
||||
|
||||
public void disconnect() {
|
||||
if (client != null) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public CoapResponse postMethod(String requestBody) throws ConnectorException, IOException {
|
||||
return this.postMethod(requestBody.getBytes());
|
||||
}
|
||||
|
||||
public CoapResponse postMethod(byte[] requestBodyBytes) throws ConnectorException, IOException {
|
||||
return client.setTimeout(CLIENT_REQUEST_TIMEOUT).post(requestBodyBytes, MediaTypeRegistry.APPLICATION_JSON);
|
||||
}
|
||||
|
||||
public void postMethod(CoapHandler handler, String payload, int format) {
|
||||
client.post(handler, payload, format);
|
||||
}
|
||||
|
||||
public void postMethod(CoapHandler handler, byte[] payload, int format) {
|
||||
client.post(handler, payload, format);
|
||||
}
|
||||
|
||||
public CoapResponse getMethod() throws ConnectorException, IOException {
|
||||
return client.setTimeout(CLIENT_REQUEST_TIMEOUT).get();
|
||||
}
|
||||
|
||||
public CoapObserveRelation getObserveRelation(TestCoapClientCallback callback){
|
||||
Request request = Request.newGet().setObserve();
|
||||
request.setType(CoAP.Type.CON);
|
||||
return client.observe(request, callback);
|
||||
}
|
||||
|
||||
public void setURI(String featureTokenUrl) {
|
||||
if (client == null) {
|
||||
throw new RuntimeException("Failed to connect! CoapClient is not initialized!");
|
||||
}
|
||||
client.setURI(featureTokenUrl);
|
||||
}
|
||||
|
||||
public void setURI(String accessToken, FeatureType featureType) {
|
||||
if (featureType == null){
|
||||
featureType = FeatureType.ATTRIBUTES;
|
||||
}
|
||||
setURI(getFeatureTokenUrl(accessToken, featureType));
|
||||
}
|
||||
|
||||
private CoapClient createClient() {
|
||||
return new CoapClient();
|
||||
}
|
||||
|
||||
private CoapClient createClient(String featureTokenUrl) {
|
||||
return new CoapClient(featureTokenUrl);
|
||||
}
|
||||
|
||||
public static String getFeatureTokenUrl(FeatureType featureType) {
|
||||
return COAP_BASE_URL + featureType.name().toLowerCase();
|
||||
}
|
||||
|
||||
public static String getFeatureTokenUrl(String token, FeatureType featureType) {
|
||||
return COAP_BASE_URL + token + "/" + featureType.name().toLowerCase();
|
||||
}
|
||||
|
||||
public static String getFeatureTokenUrl(String token, FeatureType featureType, int requestId) {
|
||||
return COAP_BASE_URL + token + "/" + featureType.name().toLowerCase() + "/" + requestId;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.msa;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.californium.core.CoapHandler;
|
||||
import org.eclipse.californium.core.CoapResponse;
|
||||
import org.eclipse.californium.core.coap.CoAP;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
public class TestCoapClientCallback implements CoapHandler {
|
||||
|
||||
protected final CountDownLatch latch;
|
||||
protected Integer observe;
|
||||
protected byte[] payloadBytes;
|
||||
protected CoAP.ResponseCode responseCode;
|
||||
|
||||
public TestCoapClientCallback() {
|
||||
this.latch = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
public TestCoapClientCallback(int subscribeCount) {
|
||||
this.latch = new CountDownLatch(subscribeCount);
|
||||
}
|
||||
|
||||
public Integer getObserve() {
|
||||
return observe;
|
||||
}
|
||||
|
||||
public byte[] getPayloadBytes() {
|
||||
return payloadBytes;
|
||||
}
|
||||
|
||||
public CoAP.ResponseCode getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onLoad(CoapResponse response) {
|
||||
observe = response.getOptions().getObserve();
|
||||
payloadBytes = response.getPayload();
|
||||
responseCode = response.getCode();
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError() {
|
||||
log.warn("Command Response Ack Error, No connect");
|
||||
}
|
||||
|
||||
}
|
||||
@ -27,7 +27,9 @@ import io.restassured.path.json.JsonPath;
|
||||
import io.restassured.response.ValidatableResponse;
|
||||
import io.restassured.specification.RequestSpecification;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
@ -93,6 +95,15 @@ public class TestRestClient {
|
||||
.as(Device.class);
|
||||
}
|
||||
|
||||
public Device getDeviceByName(String deviceName) {
|
||||
return given().spec(requestSpec).pathParam("deviceName", deviceName)
|
||||
.get("/api/tenant/devices?deviceName={deviceName}")
|
||||
.then()
|
||||
.statusCode(HTTP_OK)
|
||||
.extract()
|
||||
.as(Device.class);
|
||||
}
|
||||
|
||||
public ValidatableResponse getDeviceById(DeviceId deviceId, int statusCode) {
|
||||
return given().spec(requestSpec)
|
||||
.pathParams("deviceId", deviceId.getId())
|
||||
@ -159,6 +170,14 @@ public class TestRestClient {
|
||||
.as(JsonNode.class);
|
||||
}
|
||||
|
||||
public JsonPath postProvisionRequest(String provisionRequest) {
|
||||
return given().spec(requestSpec)
|
||||
.body(provisionRequest)
|
||||
.post("/api/v1/provision")
|
||||
.getBody()
|
||||
.jsonPath();
|
||||
}
|
||||
|
||||
public PageData<RuleChain> getRuleChains(PageLink pageLink) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
addPageLinkToParam(params, pageLink);
|
||||
@ -252,6 +271,24 @@ public class TestRestClient {
|
||||
.as(JsonNode.class);
|
||||
}
|
||||
|
||||
public DeviceProfile getDeviceProfileById(DeviceProfileId deviceProfileId) {
|
||||
return given().spec(requestSpec).get("/api/deviceProfile/{deviceProfileId}", deviceProfileId.getId())
|
||||
.then()
|
||||
.assertThat()
|
||||
.statusCode(HTTP_OK)
|
||||
.extract()
|
||||
.as(DeviceProfile.class);
|
||||
}
|
||||
|
||||
public DeviceProfile postDeviceProfile(DeviceProfile deviceProfile) {
|
||||
return given().spec(requestSpec).body(deviceProfile)
|
||||
.post("/api/deviceProfile")
|
||||
.then()
|
||||
.statusCode(HTTP_OK)
|
||||
.extract()
|
||||
.as(DeviceProfile.class);
|
||||
}
|
||||
|
||||
public String getToken() {
|
||||
return token;
|
||||
}
|
||||
|
||||
@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.msa.connectivity;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.gson.JsonObject;
|
||||
import io.restassured.path.json.JsonPath;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.msg.session.FeatureType;
|
||||
import org.thingsboard.server.msa.AbstractContainerTest;
|
||||
import org.thingsboard.server.msa.TestCoapClient;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.thingsboard.server.msa.prototypes.DevicePrototypes.defaultDevicePrototype;
|
||||
|
||||
public class CoapClientTest extends AbstractContainerTest {
|
||||
private TestCoapClient client;
|
||||
|
||||
private Device device;
|
||||
@BeforeMethod
|
||||
public void setUp() throws Exception {
|
||||
testRestClient.login("tenant@thingsboard.org", "tenant");
|
||||
device = testRestClient.postDevice("", defaultDevicePrototype("http_"));
|
||||
}
|
||||
|
||||
@AfterMethod
|
||||
public void tearDown() {
|
||||
testRestClient.deleteDeviceIfExists(device.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithPreProvisionedStrategy() throws Exception {
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
JsonNode provisionResponse = JacksonUtil.fromBytes(createCoapClientAndPublish(device.getName()));
|
||||
|
||||
assertThat(provisionResponse.get("credentialsType").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(provisionResponse.get("credentialsValue").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithAllowToCreateNewDevicesStrategy() throws Exception {
|
||||
|
||||
String testDeviceName = "test_provision_device";
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
|
||||
|
||||
JsonNode provisionResponse = JacksonUtil.fromBytes(createCoapClientAndPublish(testDeviceName));
|
||||
|
||||
testRestClient.deleteDeviceIfExists(device.getId());
|
||||
device = testRestClient.getDeviceByName(testDeviceName);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
assertThat(provisionResponse.get("credentialsType").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(provisionResponse.get("credentialsValue").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
|
||||
JsonNode response = JacksonUtil.fromBytes(createCoapClientAndPublish(null));
|
||||
|
||||
assertThat(response.get("status").asText()).isEqualTo("NOT_FOUND");
|
||||
}
|
||||
|
||||
private byte[] createCoapClientAndPublish(String deviceName) throws Exception {
|
||||
String provisionRequestMsg = createTestProvisionMessage(deviceName);
|
||||
client = new TestCoapClient(TestCoapClient.getFeatureTokenUrl(FeatureType.PROVISION));
|
||||
return client.postMethod(provisionRequestMsg.getBytes()).getPayload();
|
||||
}
|
||||
|
||||
private String createTestProvisionMessage(String deviceName) {
|
||||
ObjectNode provisionRequest = JacksonUtil.newObjectNode();
|
||||
provisionRequest.put("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.put("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
if (deviceName != null) {
|
||||
provisionRequest.put("deviceName", deviceName);
|
||||
}
|
||||
return provisionRequest.toString();
|
||||
}
|
||||
|
||||
}
|
||||
@ -16,10 +16,14 @@
|
||||
package org.thingsboard.server.msa.connectivity;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.gson.JsonObject;
|
||||
import io.restassured.path.json.JsonPath;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.msa.AbstractContainerTest;
|
||||
import org.thingsboard.server.msa.WsClient;
|
||||
@ -69,8 +73,8 @@ public class HttpClientTest extends AbstractContainerTest {
|
||||
String accessToken = testRestClient.getDeviceCredentialsByDeviceId(device.getId()).getCredentialsId();
|
||||
assertThat(accessToken).isNotNull();
|
||||
|
||||
JsonNode sharedAattribute = mapper.readTree(createPayload().toString());
|
||||
testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAattribute);
|
||||
JsonNode sharedAttribute = mapper.readTree(createPayload().toString());
|
||||
testRestClient.postTelemetryAttribute(DEVICE, device.getId(), SHARED_SCOPE, sharedAttribute);
|
||||
|
||||
JsonNode clientAttribute = mapper.readTree(createPayload().toString());
|
||||
testRestClient.postAttribute(accessToken, clientAttribute);
|
||||
@ -78,11 +82,11 @@ public class HttpClientTest extends AbstractContainerTest {
|
||||
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
|
||||
|
||||
JsonNode attributes = testRestClient.getAttributes(accessToken, null, null);
|
||||
assertThat(attributes.get("shared")).isEqualTo(sharedAattribute);
|
||||
assertThat(attributes.get("shared")).isEqualTo(sharedAttribute);
|
||||
assertThat(attributes.get("client")).isEqualTo(clientAttribute);
|
||||
|
||||
JsonNode attributes2 = testRestClient.getAttributes(accessToken, null, "stringKey");
|
||||
assertThat(attributes2.get("shared").get("stringKey")).isEqualTo(sharedAattribute.get("stringKey"));
|
||||
assertThat(attributes2.get("shared").get("stringKey")).isEqualTo(sharedAttribute.get("stringKey"));
|
||||
assertThat(attributes2.has("client")).isFalse();
|
||||
|
||||
JsonNode attributes3 = testRestClient.getAttributes(accessToken, "longKey,stringKey", null);
|
||||
@ -91,4 +95,77 @@ public class HttpClientTest extends AbstractContainerTest {
|
||||
assertThat(attributes3.get("client").get("longKey")).isEqualTo(clientAttribute.get("longKey"));
|
||||
assertThat(attributes3.get("client").get("stringKey")).isEqualTo(clientAttribute.get("stringKey"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithPreProvisionedStrategy() throws Exception {
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
provisionRequest.addProperty("deviceName", device.getName());
|
||||
|
||||
JsonPath provisionResponse = testRestClient.postProvisionRequest(provisionRequest.toString());
|
||||
|
||||
String credentialsType = provisionResponse.get("credentialsType");
|
||||
String credentialsValue = provisionResponse.get("credentialsValue");
|
||||
String status = provisionResponse.get("status");
|
||||
|
||||
assertThat(credentialsType).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(credentialsValue).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(status).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithAllowToCreateNewDevicesStrategy() throws Exception {
|
||||
|
||||
String testDeviceName = "test_provision_device";
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
provisionRequest.addProperty("deviceName", testDeviceName);
|
||||
|
||||
JsonPath provisionResponse = testRestClient.postProvisionRequest(provisionRequest.toString());
|
||||
|
||||
String credentialsType = provisionResponse.get("credentialsType");
|
||||
String credentialsValue = provisionResponse.get("credentialsValue");
|
||||
String status = provisionResponse.get("status");
|
||||
|
||||
testRestClient.deleteDeviceIfExists(device.getId());
|
||||
device = testRestClient.getDeviceByName(testDeviceName);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
assertThat(credentialsType).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(credentialsValue).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(status).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
|
||||
JsonPath provisionResponse = testRestClient.postProvisionRequest(provisionRequest.toString());
|
||||
|
||||
String status = provisionResponse.get("status");
|
||||
|
||||
assertThat(status).isEqualTo("NOT_FOUND");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -34,6 +34,8 @@ import org.thingsboard.mqtt.MqttClientConfig;
|
||||
import org.thingsboard.mqtt.MqttHandler;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
@ -322,6 +324,104 @@ public class MqttClientTest extends AbstractContainerTest {
|
||||
assertThat(mqttClient.isConnected()).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithPreProvisionedStrategy() throws Exception {
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.CHECK_PRE_PROVISIONED_DEVICES);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
MqttMessageListener listener = new MqttMessageListener();
|
||||
MqttClient mqttClient = getMqttClient("provision", listener);
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
provisionRequest.addProperty("deviceName", device.getName());
|
||||
|
||||
mqttClient.publish("/provision/request", Unpooled.wrappedBuffer(provisionRequest.toString().getBytes())).get();
|
||||
|
||||
//Wait for response
|
||||
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
|
||||
|
||||
MqttEvent provisionResponseMsg = listener.getEvents().poll(timeoutMultiplier, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(provisionResponseMsg).isNotNull();
|
||||
|
||||
JsonNode provisionResponse = mapper.readTree(provisionResponseMsg.getMessage());
|
||||
|
||||
assertThat(provisionResponse.get("credentialsType").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(provisionResponse.get("credentialsValue").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithAllowToCreateNewDevicesStrategy() throws Exception {
|
||||
|
||||
String testDeviceName = "test_provision_device";
|
||||
|
||||
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
|
||||
|
||||
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
|
||||
|
||||
MqttMessageListener listener = new MqttMessageListener();
|
||||
MqttClient mqttClient = getMqttClient("provision", listener);
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
provisionRequest.addProperty("deviceName", testDeviceName);
|
||||
|
||||
mqttClient.publish("/provision/request", Unpooled.wrappedBuffer(provisionRequest.toString().getBytes())).get();
|
||||
|
||||
//Wait for response
|
||||
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
|
||||
|
||||
MqttEvent provisionResponseMsg = listener.getEvents().poll(timeoutMultiplier, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(provisionResponseMsg).isNotNull();
|
||||
|
||||
JsonNode provisionResponse = mapper.readTree(provisionResponseMsg.getMessage());
|
||||
|
||||
testRestClient.deleteDeviceIfExists(device.getId());
|
||||
device = testRestClient.getDeviceByName(testDeviceName);
|
||||
|
||||
DeviceCredentials expectedDeviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(device.getId());
|
||||
|
||||
assertThat(provisionResponse.get("credentialsType").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsType().name());
|
||||
assertThat(provisionResponse.get("credentialsValue").asText()).isEqualTo(expectedDeviceCredentials.getCredentialsId());
|
||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("SUCCESS");
|
||||
|
||||
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {
|
||||
|
||||
MqttMessageListener listener = new MqttMessageListener();
|
||||
MqttClient mqttClient = getMqttClient("provision", listener);
|
||||
|
||||
JsonObject provisionRequest = new JsonObject();
|
||||
provisionRequest.addProperty("provisionDeviceKey", TEST_PROVISION_DEVICE_KEY);
|
||||
provisionRequest.addProperty("provisionDeviceSecret", TEST_PROVISION_DEVICE_SECRET);
|
||||
|
||||
mqttClient.publish("/provision/request", Unpooled.wrappedBuffer(provisionRequest.toString().getBytes())).get();
|
||||
|
||||
//Wait for response
|
||||
TimeUnit.SECONDS.sleep(3 * timeoutMultiplier);
|
||||
|
||||
MqttEvent provisionResponseMsg = listener.getEvents().poll(timeoutMultiplier, TimeUnit.SECONDS);
|
||||
|
||||
assertThat(provisionResponseMsg).isNotNull();
|
||||
|
||||
JsonNode provisionResponse = mapper.readTree(provisionResponseMsg.getMessage());
|
||||
|
||||
assertThat(provisionResponse.get("status").asText()).isEqualTo("NOT_FOUND");
|
||||
}
|
||||
|
||||
private RuleChainId createRootRuleChainForRpcResponse() throws Exception {
|
||||
RuleChain newRuleChain = new RuleChain();
|
||||
newRuleChain.setName("testRuleChain");
|
||||
@ -356,9 +456,13 @@ public class MqttClientTest extends AbstractContainerTest {
|
||||
}
|
||||
|
||||
private MqttClient getMqttClient(DeviceCredentials deviceCredentials, MqttMessageListener listener) throws InterruptedException, ExecutionException {
|
||||
return getMqttClient(deviceCredentials.getCredentialsId(), listener);
|
||||
}
|
||||
|
||||
private MqttClient getMqttClient(String username, MqttMessageListener listener) throws InterruptedException, ExecutionException {
|
||||
MqttClientConfig clientConfig = new MqttClientConfig();
|
||||
clientConfig.setClientId("MQTT client from test");
|
||||
clientConfig.setUsername(deviceCredentials.getCredentialsId());
|
||||
clientConfig.setUsername(username);
|
||||
MqttClient mqttClient = MqttClient.create(clientConfig, listener);
|
||||
mqttClient.connect("localhost", 1883).get();
|
||||
return mqttClient;
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>msa</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>thingsboard</artifactId>
|
||||
</parent>
|
||||
<artifactId>msa</artifactId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>msa</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>msa</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>transport</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa.transport</groupId>
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.thingsboard</groupId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
<artifactId>msa</artifactId>
|
||||
</parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
|
||||
@ -21,7 +21,7 @@
|
||||
<parent>
|
||||
<groupId>org.thingsboard.msa</groupId>
|
||||
<artifactId>transport</artifactId>
|
||||
<version>3.4.2-SNAPSHOT</version>
|
||||
<version>3.4.2</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.thingsboard.msa.transport</groupId>
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
x
Reference in New Issue
Block a user