Merge branch 'master' into alarm-state-fix

This commit is contained in:
Volodymyr Babak 2024-05-09 12:49:48 +03:00 committed by GitHub
commit 6556af86a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 946 additions and 926 deletions

View File

@ -33,7 +33,6 @@ import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MClientCredential;
import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MSecurityMode;
import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.device.data.SnmpDeviceTransportConfiguration;
@ -68,6 +67,8 @@ import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.eclipse.leshan.core.LwM2m.Version.V1_0;
@Service
@TbCoreComponent
@RequiredArgsConstructor
@ -256,7 +257,7 @@ public class DeviceBulkImportService extends AbstractBulkImportService<Device> {
Lwm2mDeviceProfileTransportConfiguration transportConfiguration = new Lwm2mDeviceProfileTransportConfiguration();
transportConfiguration.setBootstrap(Collections.emptyList());
transportConfiguration.setClientLwM2mSettings(new OtherConfiguration(1, 1, 1, PowerMode.DRX, null, null, null, null, null));
transportConfiguration.setClientLwM2mSettings(new OtherConfiguration(1, 1, 1, PowerMode.DRX, null, null, null, null, null, V1_0.toString()));
transportConfiguration.setObserveAttr(new TelemetryMappingConfiguration(Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptyMap()));
DeviceProfileData deviceProfileData = new DeviceProfileData();

View File

@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.env.Environment;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.User;
@ -39,11 +40,15 @@ import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@Slf4j
public abstract class AbstractTbEntityService {
@Autowired
private Environment env;
@Value("${server.log_controller_error_stack_trace}")
@Getter
private boolean logControllerErrorStackTrace;
@ -67,6 +72,10 @@ public abstract class AbstractTbEntityService {
@Lazy
private EntitiesVersionControlService vcService;
protected boolean isTestProfile() {
return Set.of(this.env.getActiveProfiles()).contains("test");
}
protected <T> T checkNotNull(T reference) throws ThingsboardException {
return checkNotNull(reference, "Requested item wasn't found!");
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.entitiy.tenant;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Value;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
@ -52,7 +53,9 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T
Tenant savedTenant = tenantService.saveTenant(tenant, tenantId -> {
installScripts.createDefaultRuleChains(tenantId);
installScripts.createDefaultEdgeRuleChains(tenantId);
installScripts.createDefaultTenantDashboards(tenantId, null);
if (!isTestProfile()) {
installScripts.createDefaultTenantDashboards(tenantId, null);
}
});
tenantProfileCache.evict(savedTenant.getId());

View File

@ -27,11 +27,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperService
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager;
import javax.annotation.PreDestroy;
import java.util.LinkedHashSet;

View File

@ -27,13 +27,13 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.gen.transport.TransportProtos.ToHousekeeperServiceMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.housekeeper.processor.HousekeeperTaskProcessor;
import org.thingsboard.server.service.housekeeper.stats.HousekeeperStatsService;
import org.thingsboard.server.service.queue.consumer.QueueConsumerManager;
import javax.annotation.PreDestroy;
import java.util.List;

View File

@ -317,7 +317,8 @@ public class InstallScripts {
@SneakyThrows
public void loadSystemImages() {
log.info("Loading system images...");
Stream<Path> dashboardsFiles = Files.list(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR));
Stream<Path> dashboardsFiles = Stream.concat(Files.list(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR)),
Files.list(Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, DASHBOARDS_DIR)));
try (dashboardsFiles) {
dashboardsFiles.forEach(file -> {
try {

View File

@ -20,12 +20,12 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@ -43,6 +43,7 @@ import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
@ -78,10 +79,11 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceM
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.edge.EdgeNotificationService;
@ -89,6 +91,7 @@ import org.thingsboard.server.service.notification.NotificationSchedulerService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.resource.TbImageService;
@ -109,7 +112,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -122,9 +124,11 @@ import java.util.stream.Collectors;
public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCoreNotificationMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}")
private long pollDuration;
private long pollInterval;
@Value("${queue.core.pack-processing-timeout}")
private long packProcessingTimeout;
@Value("${queue.core.consumer-per-partition:true}")
private boolean consumerPerPartition;
@Value("${queue.core.stats.enabled:false}")
private boolean statsEnabled;
@ -133,7 +137,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
@Value("${queue.core.ota.pack-size:100}")
private int firmwarePackSize;
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> mainConsumer;
private final DeviceStateService stateService;
private final TbApiUsageStateService statsService;
private final TbLocalSubscriptionService localSubscriptionService;
@ -144,14 +147,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final GitVersionControlQueueService vcQueueService;
private final NotificationSchedulerService notificationSchedulerService;
private final NotificationRuleProcessor notificationRuleProcessor;
private final TbCoreConsumerStats stats;
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private final TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
private final TbCoreQueueFactory queueFactory;
private final TbImageService imageService;
private final TbCoreConsumerStats stats;
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
protected volatile ExecutorService consumersExecutor;
protected volatile ExecutorService usageStatsExecutor;
private volatile ExecutorService firmwareStatesExecutor;
private volatile ListeningExecutorService deviceActivityEventsExecutor;
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
@ -176,10 +179,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
NotificationRuleProcessor notificationRuleProcessor,
TbImageService imageService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService);
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
this.firmwareStatesConsumer = tbCoreQueueFactory.createToOtaPackageStateServiceMsgConsumer();
eventPublisher, jwtSettingsService);
this.stateService = stateService;
this.localSubscriptionService = localSubscriptionService;
this.subscriptionManagerService = subscriptionManagerService;
@ -192,152 +192,146 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.notificationSchedulerService = notificationSchedulerService;
this.notificationRuleProcessor = notificationRuleProcessor;
this.imageService = imageService;
this.queueFactory = tbCoreQueueFactory;
}
@PostConstruct
public void init() {
super.init("tb-core-notifications-consumer");
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-consumer"));
this.usageStatsExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
this.firmwareStatesExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
super.init("tb-core");
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
this.mainConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder()
.queueKey(new QueueKey(ServiceType.TB_CORE))
.config(CoreQueueConfig.of(consumerPerPartition, (int) pollInterval))
.msgPackProcessor(this::processMsgs)
.consumerCreator(config -> queueFactory.createToCoreMsgConsumer())
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
this.usageStatsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToUsageStatsServiceMsg>>builder()
.name("TB Usage Stats")
.msgPackProcessor(this::processUsageStatsMsg)
.pollInterval(pollInterval)
.consumerCreator(queueFactory::createToUsageStatsServiceMsgConsumer)
.consumerExecutor(consumersExecutor)
.threadPrefix("usage-stats")
.build();
this.firmwareStatesConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>>builder()
.name("TB Ota Package States")
.msgPackProcessor(this::processFirmwareMsgs)
.pollInterval(pollInterval)
.consumerCreator(queueFactory::createToOtaPackageStateServiceMsgConsumer)
.consumerExecutor(consumersExecutor)
.threadPrefix("firmware")
.build();
}
@PreDestroy
public void destroy() {
super.destroy();
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
if (usageStatsExecutor != null) {
usageStatsExecutor.shutdownNow();
}
if (firmwareStatesExecutor != null) {
firmwareStatesExecutor.shutdownNow();
}
if (deviceActivityEventsExecutor != null) {
deviceActivityEventsExecutor.shutdownNow();
}
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onApplicationEvent(ApplicationReadyEvent event) {
super.onApplicationEvent(event);
launchUsageStatsConsumer();
launchOtaPackageUpdateNotificationConsumer();
@Override
protected void startConsumers() {
super.startConsumers();
firmwareStatesConsumer.subscribe();
firmwareStatesConsumer.launch();
usageStatsConsumer.launch();
}
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.info("Subscribing to partitions: {}", event.getPartitions());
this.mainConsumer.subscribe(event.getPartitions());
this.usageStatsConsumer.subscribe(
event
.getPartitions()
.stream()
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
.collect(Collectors.toSet()));
this.firmwareStatesConsumer.subscribe();
mainConsumer.update(event.getPartitions());
usageStatsConsumer.subscribe(event.getPartitions()
.stream()
.map(tpi -> tpi.newByTopic(usageStatsConsumer.getConsumer().getTopic()))
.collect(Collectors.toSet()));
}
@Override
protected void launchMainConsumers() {
consumersExecutor.submit(() -> {
while (!stopped) {
private void processMsgs(List<TbProtoQueueMsg<ToCoreMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer, CoreQueueConfig config) throws Exception {
List<IdMsgPair<ToCoreMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
TbProtoQueueMsg<ToCoreMsg> msg = element.getMsg();
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
List<TbProtoQueueMsg<ToCoreMsg>> msgs = mainConsumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
List<IdMsgPair<ToCoreMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
TbProtoQueueMsg<ToCoreMsg> msg = element.getMsg();
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
ToCoreMsg toCoreMsg = msg.getValue();
pendingMsgHolder.setToCoreMsg(toCoreMsg);
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorMsg()) {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
if (actorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) actorMsg);
} else {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
actorContext.tell(actorMsg);
}
}
callback.onSuccess();
} else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) {
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
} else if (toCoreMsg.hasErrorEventMsg()) {
forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
} else if (toCoreMsg.hasLifecycleEventMsg()) {
forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
callback.onFailure(e);
ToCoreMsg toCoreMsg = msg.getValue();
pendingMsgHolder.setToCoreMsg(toCoreMsg);
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorMsg()) {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
} else if (toCoreMsg.hasDeviceStateServiceMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceStateServiceMsg());
forwardToStateService(toCoreMsg.getDeviceStateServiceMsg(), callback);
} else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.hasDeviceConnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceConnectMsg());
forwardToStateService(toCoreMsg.getDeviceConnectMsg(), callback);
} else if (toCoreMsg.hasDeviceActivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceActivityMsg());
forwardToStateService(toCoreMsg.getDeviceActivityMsg(), callback);
} else if (toCoreMsg.hasDeviceDisconnectMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceDisconnectMsg());
forwardToStateService(toCoreMsg.getDeviceDisconnectMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
if (actorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) actorMsg);
} else {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
actorContext.tell(actorMsg);
}
});
});
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!packSubmitFuture.isDone()) {
packSubmitFuture.cancel(true);
ToCoreMsg lastSubmitMsg = pendingMsgHolder.getToCoreMsg();
log.info("Timeout to process message: {}", lastSubmitMsg);
}
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
mainConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
callback.onSuccess();
} else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) {
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
} else if (toCoreMsg.hasErrorEventMsg()) {
forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
} else if (toCoreMsg.hasLifecycleEventMsg()) {
forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
callback.onFailure(e);
}
}
log.info("TB Core Consumer stopped.");
});
});
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!packSubmitFuture.isDone()) {
packSubmitFuture.cancel(true);
ToCoreMsg lastSubmitMsg = pendingMsgHolder.getToCoreMsg();
log.info("Timeout to process message: {}", lastSubmitMsg);
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
}
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();
}
private static class PendingMsgHolder {
@ -353,7 +347,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
@Override
protected long getNotificationPollDuration() {
return pollDuration;
return pollInterval;
}
@Override
@ -361,6 +355,16 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
return packProcessingTimeout;
}
@Override
protected int getMgmtThreadPoolSize() {
return Math.max(Runtime.getRuntime().availableProcessors(), 4);
}
@Override
protected TbQueueConsumer<TbProtoQueueMsg<ToCoreNotificationMsg>> createNotificationsConsumer() {
return queueFactory.createToCoreNotificationsMsgConsumer();
}
@Override
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCoreNotificationMsg> msg, TbCallback callback) {
ToCoreNotificationMsg toCoreNotification = msg.getValue();
@ -409,92 +413,55 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
private void launchUsageStatsConsumer() {
usageStatsExecutor.submit(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgs = usageStatsConsumer.poll(getNotificationPollDuration());
if (msgs.isEmpty()) {
continue;
}
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
pendingMap.forEach((id, msg) -> {
log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
handleUsageStats(msg, callback);
} catch (Throwable e) {
log.warn("[{}] Failed to process usage stats: {}", id, msg, e);
callback.onFailure(e);
}
});
if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process usage stats: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process usage stats: {}", id, msg.getValue()));
}
usageStatsConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain usage stats from queue.", e);
try {
Thread.sleep(getNotificationPollDuration());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new usage stats", e2);
}
}
}
private void processUsageStatsMsg(List<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> consumer) throws Exception {
ConcurrentMap<UUID, TbProtoQueueMsg<ToUsageStatsServiceMsg>> pendingMap = msgs.stream().collect(
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity()));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToUsageStatsServiceMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
pendingMap.forEach((id, msg) -> {
log.trace("[{}] Creating usage stats callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
handleUsageStats(msg, callback);
} catch (Throwable e) {
log.warn("[{}] Failed to process usage stats: {}", id, msg, e);
callback.onFailure(e);
}
log.info("TB Usage Stats Consumer stopped.");
});
if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process usage stats: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process usage stats: {}", id, msg.getValue()));
}
consumer.commit();
}
private void launchOtaPackageUpdateNotificationConsumer() {
private void processFirmwareMsgs(List<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> consumer) {
long maxProcessingTimeoutPerRecord = firmwarePackInterval / firmwarePackSize;
firmwareStatesExecutor.submit(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> msgs = firmwareStatesConsumer.poll(getNotificationPollDuration());
if (msgs.isEmpty()) {
continue;
}
long timeToSleep = maxProcessingTimeoutPerRecord;
for (TbProtoQueueMsg<ToOtaPackageStateServiceMsg> msg : msgs) {
try {
long startTime = System.currentTimeMillis();
boolean isSuccessUpdate = handleOtaPackageUpdates(msg);
long endTime = System.currentTimeMillis();
long spentTime = endTime - startTime;
timeToSleep = timeToSleep - spentTime;
if (isSuccessUpdate) {
if (timeToSleep > 0) {
log.debug("Spent time per record is: [{}]!", spentTime);
Thread.sleep(timeToSleep);
timeToSleep = 0;
}
timeToSleep += maxProcessingTimeoutPerRecord;
}
} catch (Throwable e) {
log.warn("Failed to process firmware update msg: {}", msg, e);
}
}
firmwareStatesConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain usage stats from queue.", e);
try {
Thread.sleep(getNotificationPollDuration());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new firmware updates", e2);
}
long timeToSleep = maxProcessingTimeoutPerRecord;
for (TbProtoQueueMsg<ToOtaPackageStateServiceMsg> msg : msgs) {
try {
long startTime = System.currentTimeMillis();
boolean isSuccessUpdate = handleOtaPackageUpdates(msg);
long endTime = System.currentTimeMillis();
long spentTime = endTime - startTime;
timeToSleep = timeToSleep - spentTime;
if (isSuccessUpdate) {
if (timeToSleep > 0) {
log.debug("Spent time per record is: [{}]!", spentTime);
Thread.sleep(timeToSleep);
timeToSleep = 0;
}
timeToSleep += maxProcessingTimeoutPerRecord;
}
} catch (InterruptedException e) {
return;
} catch (Throwable e) {
log.warn("Failed to process firmware update msg: {}", msg, e);
}
log.info("TB Ota Package States Consumer stopped.");
});
}
consumer.commit();
}
private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
@ -782,15 +749,17 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
@Override
protected void stopConsumers() {
if (mainConsumer != null) {
mainConsumer.unsubscribe();
}
if (usageStatsConsumer != null) {
usageStatsConsumer.unsubscribe();
}
if (firmwareStatesConsumer != null) {
firmwareStatesConsumer.unsubscribe();
}
super.stopConsumers();
mainConsumer.stop();
mainConsumer.awaitStop();
usageStatsConsumer.stop();
firmwareStatesConsumer.stop();
}
@Data(staticConstructor = "of")
public static class CoreQueueConfig implements QueueConfig {
private final boolean consumerPerPartition;
private final int pollInterval;
}
}

View File

@ -15,8 +15,8 @@
*/
package org.thingsboard.server.service.queue;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled;
@ -39,12 +39,11 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg;
import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -55,8 +54,6 @@ import org.thingsboard.server.service.queue.ruleengine.TbRuleEngineQueueConsumer
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -77,7 +74,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final ConcurrentMap<QueueKey, TbRuleEngineQueueConsumerManager> consumers = new ConcurrentHashMap<>();
public DefaultTbRuleEngineConsumerService(TbRuleEngineConsumerContext ctx,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory,
ActorSystemContext actorContext,
TbRuleEngineDeviceRpcService tbDeviceRpcService,
QueueService queueService,
@ -88,8 +84,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
PartitionService partitionService,
ApplicationEventPublisher eventPublisher,
JwtSettingsService jwtSettingsService) {
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), jwtSettingsService);
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, jwtSettingsService);
this.ctx = ctx;
this.tbDeviceRpcService = tbDeviceRpcService;
this.queueService = queueService;
@ -97,7 +92,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@PostConstruct
public void init() {
super.init("tb-rule-engine-notifications-consumer");
super.init("tb-rule-engine");
List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) {
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
@ -137,20 +132,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
});
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onApplicationEvent(ApplicationReadyEvent event) {
super.onApplicationEvent(event);
ctx.setReady(true);
}
@Override
protected void launchMainConsumers() {}
@Override
protected void stopConsumers() {
super.stopConsumers();
consumers.values().forEach(TbRuleEngineQueueConsumerManager::stop);
consumers.values().forEach(TbRuleEngineQueueConsumerManager::awaitStop);
ctx.stop();
}
@Override
@ -168,6 +154,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
return ctx.getPackProcessingTimeout();
}
@Override
protected int getMgmtThreadPoolSize() {
return ctx.getMgmtThreadPoolSize();
}
@Override
protected TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> createNotificationsConsumer() {
return ctx.getQueueFactory().createToRuleEngineNotificationsMsgConsumer();
}
@Override
protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbCallback callback) throws Exception {
ToRuleEngineNotificationMsg nfMsg = msg.getValue();
@ -244,7 +240,13 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
private TbRuleEngineQueueConsumerManager createConsumer(QueueKey queueKey, Queue queue) {
var consumer = new TbRuleEngineQueueConsumerManager(ctx, queueKey);
var consumer = TbRuleEngineQueueConsumerManager.create()
.ctx(ctx)
.queueKey(queueKey)
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
consumers.put(queueKey, consumer);
consumer.init(queue);
return consumer;

View File

@ -0,0 +1,322 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue.consumer;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.ruleengine.QueueEvent;
import org.thingsboard.server.service.queue.ruleengine.TbQueueConsumerManagerTask;
import org.thingsboard.server.service.queue.ruleengine.TbQueueConsumerTask;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public class MainQueueConsumerManager<M extends TbQueueMsg, C extends QueueConfig> {
protected final QueueKey queueKey;
@Getter
protected C config;
protected final MsgPackProcessor<M, C> msgPackProcessor;
protected final Function<C, TbQueueConsumer<M>> consumerCreator;
protected final ExecutorService consumerExecutor;
protected final ScheduledExecutorService scheduler;
protected final ExecutorService taskExecutor;
private final java.util.Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>();
private final ReentrantLock lock = new ReentrantLock();
@Getter
private volatile Set<TopicPartitionInfo> partitions;
protected volatile ConsumerWrapper<M> consumerWrapper;
protected volatile boolean stopped;
@Builder
public MainQueueConsumerManager(QueueKey queueKey, C config,
MsgPackProcessor<M, C> msgPackProcessor,
Function<C, TbQueueConsumer<M>> consumerCreator,
ExecutorService consumerExecutor,
ScheduledExecutorService scheduler,
ExecutorService taskExecutor) {
this.queueKey = queueKey;
this.config = config;
this.msgPackProcessor = msgPackProcessor;
this.consumerCreator = consumerCreator;
this.consumerExecutor = consumerExecutor;
this.scheduler = scheduler;
this.taskExecutor = taskExecutor;
if (config != null) {
init(config);
}
}
public void init(C config) {
this.config = config;
if (config.isConsumerPerPartition()) {
this.consumerWrapper = new ConsumerPerPartitionWrapper();
} else {
this.consumerWrapper = new SingleConsumerWrapper();
}
log.debug("[{}] Initialized consumer for queue: {}", queueKey, config);
}
public void update(C config) {
addTask(TbQueueConsumerManagerTask.configUpdate(config));
}
public void update(Set<TopicPartitionInfo> partitions) {
addTask(TbQueueConsumerManagerTask.partitionChange(partitions));
}
protected void addTask(TbQueueConsumerManagerTask todo) {
if (stopped) {
return;
}
tasks.add(todo);
log.trace("[{}] Added task: {}", queueKey, todo);
tryProcessTasks();
}
private void tryProcessTasks() {
taskExecutor.submit(() -> {
if (lock.tryLock()) {
try {
C newConfig = null;
Set<TopicPartitionInfo> newPartitions = null;
while (!stopped) {
TbQueueConsumerManagerTask task = tasks.poll();
if (task == null) {
break;
}
log.trace("[{}] Processing task: {}", queueKey, task);
if (task.getEvent() == QueueEvent.PARTITION_CHANGE) {
newPartitions = task.getPartitions();
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
newConfig = (C) task.getConfig();
} else {
processTask(task);
}
}
if (stopped) {
return;
}
if (newConfig != null) {
doUpdate(newConfig);
}
if (newPartitions != null) {
doUpdate(newPartitions);
}
} catch (Exception e) {
log.error("[{}] Failed to process tasks", queueKey, e);
} finally {
lock.unlock();
}
} else {
log.trace("[{}] Failed to acquire lock", queueKey);
scheduler.schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS);
}
});
}
protected void processTask(TbQueueConsumerManagerTask task) {
}
private void doUpdate(C newConfig) {
log.info("[{}] Processing queue update: {}", queueKey, newConfig);
var oldConfig = this.config;
this.config = newConfig;
if (log.isTraceEnabled()) {
log.trace("[{}] Old queue configuration: {}", queueKey, oldConfig);
log.trace("[{}] New queue configuration: {}", queueKey, newConfig);
}
if (oldConfig == null) {
init(config);
} else if (newConfig.isConsumerPerPartition() != oldConfig.isConsumerPerPartition()) {
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
init(config);
if (partitions != null) {
doUpdate(partitions); // even if partitions number was changed, there can be no partition change event
}
} else {
log.trace("[{}] Silently applied new config, because consumer-per-partition not changed", queueKey);
// do nothing, because partitions change (if they changed) will be handled on PartitionChangeEvent,
// and changes to other config values will be picked up by consumer on the fly,
// and queue topic and name are immutable
}
}
private void doUpdate(Set<TopicPartitionInfo> partitions) {
this.partitions = partitions;
consumerWrapper.updatePartitions(partitions);
}
private void launchConsumer(TbQueueConsumerTask<M> consumerTask) {
log.info("[{}] Launching consumer", consumerTask.getKey());
Future<?> consumerLoop = consumerExecutor.submit(() -> {
ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString());
try {
consumerLoop(consumerTask.getConsumer());
} catch (Throwable e) {
log.error("Failure in consumer loop", e);
}
log.info("[{}] Consumer stopped", consumerTask.getKey());
});
consumerTask.setTask(consumerLoop);
}
private void consumerLoop(TbQueueConsumer<M> consumer) {
while (!stopped && !consumer.isStopped()) {
try {
List<M> msgs = consumer.poll(config.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
processMsgs(msgs, consumer, config);
} catch (Exception e) {
if (!consumer.isStopped()) {
log.warn("Failed to process messages from queue", e);
try {
Thread.sleep(config.getPollInterval());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
if (consumer.isStopped()) {
consumer.unsubscribe();
}
}
protected void processMsgs(List<M> msgs, TbQueueConsumer<M> consumer, C config) throws Exception {
msgPackProcessor.process(msgs, consumer, config);
}
public void stop() {
log.debug("[{}] Stopping consumers", queueKey);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
stopped = true;
}
public void awaitStop() {
log.debug("[{}] Waiting for consumers to stop", queueKey);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
}
private static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.joining(", ", "[", "]"));
}
public interface MsgPackProcessor<M extends TbQueueMsg, C extends QueueConfig> {
void process(List<M> msgs, TbQueueConsumer<M> consumer, C config) throws Exception;
}
public interface ConsumerWrapper<M extends TbQueueMsg> {
void updatePartitions(Set<TopicPartitionInfo> partitions);
Collection<TbQueueConsumerTask<M>> getConsumers();
}
class ConsumerPerPartitionWrapper implements ConsumerWrapper<M> {
private final Map<TopicPartitionInfo, TbQueueConsumerTask<M>> consumers = new HashMap<>();
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
addedPartitions.removeAll(consumers.keySet());
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions));
removedPartitions.forEach((tpi) -> consumers.get(tpi).initiateStop());
removedPartitions.forEach((tpi) -> consumers.remove(tpi).awaitCompletion());
addedPartitions.forEach((tpi) -> {
String key = queueKey + "-" + tpi.getPartition().orElse(-1);
TbQueueConsumerTask<M> consumer = new TbQueueConsumerTask<>(key, consumerCreator.apply(config));
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);
});
}
@Override
public Collection<TbQueueConsumerTask<M>> getConsumers() {
return consumers.values();
}
}
class SingleConsumerWrapper implements ConsumerWrapper<M> {
private TbQueueConsumerTask<M> consumer;
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions));
if (partitions.isEmpty()) {
if (consumer != null && consumer.isRunning()) {
consumer.initiateStop();
consumer.awaitCompletion();
}
consumer = null;
return;
}
if (consumer == null) {
consumer = new TbQueueConsumerTask<>(queueKey, consumerCreator.apply(config));
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {
launchConsumer(consumer);
}
}
@Override
public Collection<TbQueueConsumerTask<M>> getConsumers() {
if (consumer == null) {
return Collections.emptyList();
}
return List.of(consumer);
}
}
}

View File

@ -15,10 +15,11 @@
*/
package org.thingsboard.server.service.queue.processing;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.EntityType;
@ -36,6 +37,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
@ -47,9 +49,6 @@ import org.thingsboard.server.service.queue.TbPackCallback;
import org.thingsboard.server.service.queue.TbPackProcessingContext;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import jakarta.annotation.PreDestroy;
import org.thingsboard.server.service.security.model.token.JwtTokenFactory;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -57,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -64,9 +64,6 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
protected volatile ExecutorService notificationsConsumerExecutor;
protected volatile boolean stopped = false;
protected volatile boolean isReady = false;
protected final ActorSystemContext actorContext;
protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache;
@ -74,21 +71,37 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected final TbApiUsageStateService apiUsageStateService;
protected final PartitionService partitionService;
protected final ApplicationEventPublisher eventPublisher;
protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
protected final JwtSettingsService jwtSettingsService;
public void init(String nfConsumerThreadName) {
this.notificationsConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(nfConsumerThreadName));
protected QueueConsumerManager<TbProtoQueueMsg<N>> nfConsumer;
protected ExecutorService consumersExecutor;
protected ExecutorService mgmtExecutor;
protected ScheduledExecutorService scheduler;
public void init(String prefix) {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(prefix + "-consumer"));
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler"));
this.nfConsumer = QueueConsumerManager.<TbProtoQueueMsg<N>>builder()
.name(getServiceType().getLabel() + " Notifications")
.msgPackProcessor(this::processNotifications)
.pollInterval(getNotificationPollDuration())
.consumerCreator(this::createNotificationsConsumer)
.consumerExecutor(consumersExecutor)
.threadPrefix("notifications")
.build();
}
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Subscribing to notifications: {}", nfConsumer.getTopic());
this.nfConsumer.subscribe();
this.isReady = true;
launchNotificationsConsumer();
launchMainConsumers();
public void afterStartUp() {
startConsumers();
}
protected void startConsumers() {
nfConsumer.subscribe();
nfConsumer.launch();
}
@Override
@ -98,58 +111,42 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected abstract ServiceType getServiceType();
protected abstract void launchMainConsumers();
protected abstract void stopConsumers();
protected void stopConsumers() {
nfConsumer.stop();
}
protected abstract long getNotificationPollDuration();
protected abstract long getNotificationPackProcessingTimeout();
protected void launchNotificationsConsumer() {
notificationsConsumerExecutor.submit(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<N>> msgs = nfConsumer.poll(getNotificationPollDuration());
if (msgs.isEmpty()) {
continue;
}
List<IdMsgPair<N>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<N>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
orderedMsgList.forEach(element -> {
UUID id = element.getUuid();
TbProtoQueueMsg<N> msg = element.getMsg();
log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
handleNotification(id, msg, callback);
} catch (Throwable e) {
log.warn("[{}] Failed to process notification: {}", id, msg, e);
callback.onFailure(e);
}
});
if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process notification: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process notification: {}", id, msg.getValue()));
}
nfConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain notifications from queue.", e);
try {
Thread.sleep(getNotificationPollDuration());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new notifications", e2);
}
}
}
protected abstract int getMgmtThreadPoolSize();
protected abstract TbQueueConsumer<TbProtoQueueMsg<N>> createNotificationsConsumer();
protected void processNotifications(List<TbProtoQueueMsg<N>> msgs, TbQueueConsumer<TbProtoQueueMsg<N>> consumer) throws Exception {
List<IdMsgPair<N>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
ConcurrentMap<UUID, TbProtoQueueMsg<N>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<N>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
orderedMsgList.forEach(element -> {
UUID id = element.getUuid();
TbProtoQueueMsg<N> msg = element.getMsg();
log.trace("[{}] Creating notification callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
handleNotification(id, msg, callback);
} catch (Throwable e) {
log.warn("[{}] Failed to process notification: {}", id, msg, e);
callback.onFailure(e);
}
log.info("TB Notifications Consumer stopped.");
});
if (!processingTimeoutLatch.await(getNotificationPackProcessingTimeout(), TimeUnit.MILLISECONDS)) {
ctx.getAckMap().forEach((id, msg) -> log.warn("[{}] Timeout to process notification: {}", id, msg.getValue()));
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process notification: {}", id, msg.getValue()));
}
consumer.commit();
}
protected final void handleComponentLifecycleMsg(UUID id, ComponentLifecycleMsg componentLifecycleMsg) {
@ -203,13 +200,15 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
@PreDestroy
public void destroy() {
stopped = true;
stopConsumers();
if (nfConsumer != null) {
nfConsumer.unsubscribe();
if (consumersExecutor != null) {
consumersExecutor.shutdownNow();
}
if (notificationsConsumerExecutor != null) {
notificationsConsumerExecutor.shutdownNow();
if (mgmtExecutor != null) {
mgmtExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
}

View File

@ -18,7 +18,7 @@ package org.thingsboard.server.service.queue.ruleengine;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import java.util.Set;
@ -29,7 +29,7 @@ import java.util.Set;
public class TbQueueConsumerManagerTask {
private final QueueEvent event;
private Queue queue;
private QueueConfig config;
private Set<TopicPartitionInfo> partitions;
private boolean drainQueue;
@ -37,8 +37,8 @@ public class TbQueueConsumerManagerTask {
return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue);
}
public static TbQueueConsumerManagerTask configUpdate(Queue queue) {
return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue, null, false);
public static TbQueueConsumerManagerTask configUpdate(QueueConfig config) {
return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, config, null, false);
}
public static TbQueueConsumerManagerTask partitionChange(Set<TopicPartitionInfo> partitions) {

View File

@ -20,9 +20,8 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.TbQueueMsg;
import java.util.Set;
import java.util.concurrent.Future;
@ -30,12 +29,12 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
@Slf4j
public class TbQueueConsumerTask {
public class TbQueueConsumerTask<M extends TbQueueMsg> {
@Getter
private final Object key;
@Getter
private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer;
private final TbQueueConsumer<M> consumer;
@Setter
private Future<?> task;

View File

@ -19,8 +19,6 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.queue.TbQueueAdmin;
@ -33,11 +31,6 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Component
@TbRuleEngineComponent
@Slf4j
@ -68,22 +61,4 @@ public class TbRuleEngineConsumerContext {
private final TbQueueProducerProvider producerProvider;
private final TbQueueAdmin queueAdmin;
private ExecutorService consumersExecutor;
private ExecutorService mgmtExecutor;
private ScheduledExecutorService scheduler;
private volatile boolean isReady = false;
@PostConstruct
void init() {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(mgmtThreadPoolSize, "tb-rule-engine-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler"));
}
public void stop() {
scheduler.shutdownNow();
consumersExecutor.shutdownNow();
mgmtExecutor.shutdownNow();
}
}

View File

@ -16,9 +16,8 @@
package org.thingsboard.server.service.queue.ruleengine;
import com.google.protobuf.ProtocolStringList;
import lombok.Getter;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
@ -38,171 +37,52 @@ import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.service.queue.TbMsgPackCallback;
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult;
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
public class TbRuleEngineQueueConsumerManager {
public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<TbProtoQueueMsg<ToRuleEngineMsg>, Queue> {
public static final String SUCCESSFUL_STATUS = "successful";
public static final String FAILED_STATUS = "failed";
private final TbRuleEngineConsumerContext ctx;
private final QueueKey queueKey;
private final TbRuleEngineConsumerStats stats;
private final ReentrantLock lock = new ReentrantLock(); //NonfairSync
@Getter
private volatile Queue queue;
@Getter
private volatile Set<TopicPartitionInfo> partitions;
private volatile ConsumerWrapper consumerWrapper;
private volatile boolean stopped;
private final java.util.Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>();
public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext ctx, QueueKey queueKey) {
@Builder(builderMethodName = "create") // not to conflict with super.builder()
public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext ctx,
QueueKey queueKey,
ExecutorService consumerExecutor,
ScheduledExecutorService scheduler,
ExecutorService taskExecutor) {
super(queueKey, null, null, ctx.getQueueFactory()::createToRuleEngineMsgConsumer, consumerExecutor, scheduler, taskExecutor);
this.ctx = ctx;
this.queueKey = queueKey;
this.stats = new TbRuleEngineConsumerStats(queueKey, ctx.getStatsFactory());
}
public void init(Queue queue) {
this.queue = queue;
if (queue.isConsumerPerPartition()) {
this.consumerWrapper = new ConsumerPerPartitionWrapper();
} else {
this.consumerWrapper = new SingleConsumerWrapper();
}
log.debug("[{}] Initialized consumer for queue: {}", queueKey, queue);
}
public void update(Queue queue) {
addTask(TbQueueConsumerManagerTask.configUpdate(queue));
}
public void update(Set<TopicPartitionInfo> partitions) {
addTask(TbQueueConsumerManagerTask.partitionChange(partitions));
}
public void delete(boolean drainQueue) {
addTask(TbQueueConsumerManagerTask.delete(drainQueue));
}
private void addTask(TbQueueConsumerManagerTask todo) {
if (stopped) {
return;
@Override
protected void processTask(TbQueueConsumerManagerTask task) {
if (task.getEvent() == QueueEvent.DELETE) {
doDelete(task.isDrainQueue());
}
tasks.add(todo);
log.trace("[{}] Added task: {}", queueKey, todo);
tryProcessTasks();
}
private void tryProcessTasks() {
if (!ctx.isReady()) {
log.debug("[{}] TbRuleEngineConsumerContext is not ready yet, will process tasks later", queueKey);
ctx.getScheduler().schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS);
return;
}
ctx.getMgmtExecutor().submit(() -> {
if (lock.tryLock()) {
try {
Queue newConfiguration = null;
Set<TopicPartitionInfo> newPartitions = null;
while (!stopped) {
TbQueueConsumerManagerTask task = tasks.poll();
if (task == null) {
break;
}
log.trace("[{}] Processing task: {}", queueKey, task);
if (task.getEvent() == QueueEvent.PARTITION_CHANGE) {
newPartitions = task.getPartitions();
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
newConfiguration = task.getQueue();
} else if (task.getEvent() == QueueEvent.DELETE) {
doDelete(task.isDrainQueue());
return;
}
}
if (stopped) {
return;
}
if (newConfiguration != null) {
doUpdate(newConfiguration);
}
if (newPartitions != null) {
doUpdate(newPartitions);
}
} catch (Exception e) {
log.error("[{}] Failed to process tasks", queueKey, e);
} finally {
lock.unlock();
}
} else {
log.trace("[{}] Failed to acquire lock", queueKey);
ctx.getScheduler().schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS);
}
});
}
private void doUpdate(Queue newQueue) {
log.info("[{}] Processing queue update: {}", queueKey, newQueue);
var oldQueue = this.queue;
this.queue = newQueue;
if (log.isTraceEnabled()) {
log.trace("[{}] Old queue configuration: {}", queueKey, oldQueue);
log.trace("[{}] New queue configuration: {}", queueKey, newQueue);
}
if (oldQueue == null) {
init(queue);
} else if (newQueue.isConsumerPerPartition() != oldQueue.isConsumerPerPartition()) {
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
init(queue);
if (partitions != null) {
doUpdate(partitions); // even if partitions number was changed, there can be no partition change event
}
} else {
// do nothing, because partitions change (if they changed) will be handled on PartitionChangeEvent,
// and changes to pollInterval/packProcessingTimeout/submitStrategy/processingStrategy will be picked up by consumer on the fly,
// and queue topic and name are immutable
}
}
private void doUpdate(Set<TopicPartitionInfo> partitions) {
this.partitions = partitions;
consumerWrapper.updatePartitions(partitions);
}
public void stop() {
log.debug("[{}] Stopping consumers", queueKey);
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop);
stopped = true;
}
public void awaitStop() {
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
}
private void doDelete(boolean drainQueue) {
@ -212,7 +92,7 @@ public class TbRuleEngineQueueConsumerManager {
List<TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> queueConsumers = consumerWrapper.getConsumers().stream()
.map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList());
ctx.getConsumersExecutor().submit(() -> {
consumerExecutor.submit(() -> {
if (drainQueue) {
drainQueue(queueConsumers);
}
@ -235,47 +115,10 @@ public class TbRuleEngineQueueConsumerManager {
});
}
private void launchConsumer(TbQueueConsumerTask consumerTask) {
log.info("[{}] Launching consumer", consumerTask.getKey());
Future<?> consumerLoop = ctx.getConsumersExecutor().submit(() -> {
ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString());
try {
consumerLoop(consumerTask.getConsumer());
} catch (Throwable e) {
log.error("Failure in consumer loop", e);
}
});
consumerTask.setTask(consumerLoop);
}
private void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer) {
while (!stopped && !consumer.isStopped()) {
try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(queue.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
processMsgs(msgs, consumer, queue);
} catch (Exception e) {
if (!consumer.isStopped()) {
log.warn("Failed to process messages from queue", e);
try {
Thread.sleep(ctx.getPollDuration());
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
}
if (consumer.isStopped()) {
consumer.unsubscribe();
}
log.info("Rule Engine consumer stopped");
}
private void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs,
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer,
Queue queue) throws InterruptedException {
@Override
protected void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs,
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer,
Queue queue) throws Exception {
TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue);
TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue);
submitStrategy.init(msgs);
@ -320,7 +163,7 @@ public class TbRuleEngineQueueConsumerManager {
}
private void submitMessage(TbMsgPackProcessingContext packCtx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) {
log.trace("[{}] Creating callback for topic {} message: {}", id, queue.getName(), msg.getValue());
log.trace("[{}] Creating callback for topic {} message: {}", id, config.getName(), msg.getValue());
ToRuleEngineMsg toRuleEngineMsg = msg.getValue();
TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB()));
TbMsgCallback callback = ctx.isPrometheusStatsEnabled() ?
@ -328,7 +171,7 @@ public class TbRuleEngineQueueConsumerManager {
new TbMsgPackCallback(id, tenantId, packCtx);
try {
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(queue.getName(), tenantId, toRuleEngineMsg, callback);
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
}
@ -356,7 +199,7 @@ public class TbRuleEngineQueueConsumerManager {
log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size());
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
ToRuleEngineMsg tmp = pending.getValue().getValue();
TbMsg tmpMsg = TbMsg.fromBytes(queue.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
TbMsg tmpMsg = TbMsg.fromBytes(config.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
if (printAll) {
log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
@ -379,7 +222,7 @@ public class TbRuleEngineQueueConsumerManager {
int n = 0;
while (System.currentTimeMillis() <= finishTs) {
for (TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer : consumers) {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(queue.getPollInterval());
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(config.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
@ -388,7 +231,7 @@ public class TbRuleEngineQueueConsumerManager {
MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray());
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, queue.getName(), TenantId.SYS_TENANT_ID, originator);
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, config.getName(), TenantId.SYS_TENANT_ID, originator);
ctx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, msg, null);
n++;
} catch (Throwable e) {
@ -399,90 +242,11 @@ public class TbRuleEngineQueueConsumerManager {
}
}
if (n > 0) {
log.info("Moved {} messages from {} to system {}", n, queueKey, queue.getName());
log.info("Moved {} messages from {} to system {}", n, queueKey, config.getName());
}
} catch (Exception e) {
log.error("[{}] Failed to drain queue", queueKey, e);
}
}
private static String partitionsToString(Collection<TopicPartitionInfo> partitions) {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.joining(", ", "[", "]"));
}
interface ConsumerWrapper {
void updatePartitions(Set<TopicPartitionInfo> partitions);
Collection<TbQueueConsumerTask> getConsumers();
}
class ConsumerPerPartitionWrapper implements ConsumerWrapper {
private final Map<TopicPartitionInfo, TbQueueConsumerTask> consumers = new HashMap<>();
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
addedPartitions.removeAll(consumers.keySet());
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet());
removedPartitions.removeAll(partitions);
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions));
removedPartitions.forEach((tpi) -> {
consumers.get(tpi).initiateStop();
});
removedPartitions.forEach((tpi) -> {
consumers.remove(tpi).awaitCompletion();
});
addedPartitions.forEach((tpi) -> {
String key = queueKey + "-" + tpi.getPartition().orElse(-999999);
TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue));
consumers.put(tpi, consumer);
consumer.subscribe(Set.of(tpi));
launchConsumer(consumer);
});
}
@Override
public Collection<TbQueueConsumerTask> getConsumers() {
return consumers.values();
}
}
class SingleConsumerWrapper implements ConsumerWrapper {
private TbQueueConsumerTask consumer;
@Override
public void updatePartitions(Set<TopicPartitionInfo> partitions) {
log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions));
if (partitions.isEmpty()) {
if (consumer != null && consumer.isRunning()) {
consumer.initiateStop();
consumer.awaitCompletion();
}
consumer = null;
return;
}
if (consumer == null) {
consumer = new TbQueueConsumerTask(queueKey, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue));
}
consumer.subscribe(partitions);
if (!consumer.isRunning()) {
launchConsumer(consumer);
}
}
@Override
public Collection<TbQueueConsumerTask> getConsumers() {
if (consumer == null) {
return Collections.emptyList();
}
return List.of(consumer);
}
}
}

View File

@ -1629,6 +1629,8 @@ queue:
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
# Timeout for processing a message pack by Core microservices
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
# Enable/disable a separate consumer per partition for Core queue
consumer-per-partition: "${TB_QUEUE_CORE_CONSUMER_PER_PARTITION:true}"
ota:
# Default topic name for OTA updates
topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}"

View File

@ -92,7 +92,7 @@ public class HomePageApiTest extends AbstractControllerTest {
@MockBean
private SmsService smsService;
private static final int DEFAULT_DASHBOARDS_COUNT = 1;
private static final int DEFAULT_DASHBOARDS_COUNT = 0;
//For system administrator
@Test

View File

@ -27,6 +27,8 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.QueueId;
@ -65,6 +67,9 @@ import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -86,7 +91,6 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -112,6 +116,9 @@ public class TbRuleEngineQueueConsumerManagerTest {
@Mock
private TbQueueAdmin queueAdmin;
private TbRuleEngineConsumerContext ruleEngineConsumerContext;
private ExecutorService consumersExecutor;
private ScheduledExecutorService scheduler;
private ExecutorService mgmtExecutor;
private TbRuleEngineQueueConsumerManager consumerManager;
private Queue queue;
@ -141,10 +148,10 @@ public class TbRuleEngineQueueConsumerManagerTest {
}).when(actorContext).tell(any());
ruleEngineMsgProducer = mock(TbQueueProducer.class);
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer);
ruleEngineConsumerContext.setMgmtThreadPoolSize(2);
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt");
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler"));
ruleEngineConsumerContext.setTopicDeletionDelayInSec(5);
ruleEngineConsumerContext.init();
ruleEngineConsumerContext.setReady(false);
queue = new Queue();
queue.setName("Test");
@ -174,14 +181,23 @@ public class TbRuleEngineQueueConsumerManagerTest {
}).when(queueFactory).createToRuleEngineMsgConsumer(any());
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
consumerManager = new TbRuleEngineQueueConsumerManager(ruleEngineConsumerContext, queueKey);
consumerManager = TbRuleEngineQueueConsumerManager.create()
.ctx(ruleEngineConsumerContext)
.queueKey(queueKey)
.consumerExecutor(consumersExecutor)
.scheduler(scheduler)
.taskExecutor(mgmtExecutor)
.build();
}
@After
public void afterEach() {
consumerManager.stop();
consumerManager.awaitStop();
ruleEngineConsumerContext.stop();
consumersExecutor.shutdownNow();
scheduler.shutdownNow();
mgmtExecutor.shutdownNow();
if (generateQueueMsgs) {
await().atMost(10, TimeUnit.SECONDS)
@ -199,14 +215,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
Set<TopicPartitionInfo> partitions = createTpis(2, 3, 4);
consumerManager.update(partitions);
partitions = createTpis(3, 4, 5);
consumerManager.update(partitions);
partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
// simulated multiple partition change events before consumer is ready; only latest partitions should be processed
verifyNoInteractions(queueFactory);
ruleEngineConsumerContext.setReady(true);
await().atMost(2, TimeUnit.SECONDS)
.until(() -> consumers.size() == 3);
for (TopicPartitionInfo partition : partitions) {
@ -222,14 +231,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
Set<TopicPartitionInfo> partitions = createTpis(2, 3, 4);
consumerManager.update(partitions);
partitions = createTpis(3, 4, 5);
consumerManager.update(partitions);
partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
verifyNoInteractions(queueFactory);
ruleEngineConsumerContext.setReady(true);
await().atMost(2, TimeUnit.SECONDS)
.until(() -> consumers.size() == 1);
TestConsumer consumer = getConsumer();
@ -240,7 +242,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testPartitionsUpdate_singleConsumer() {
queue.setConsumerPerPartition(false);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = Collections.emptySet();
consumerManager.update(partitions);
@ -273,7 +274,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testPartitionsUpdate_consumerPerPartition() {
queue.setConsumerPerPartition(true);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
consumerManager.update(Collections.emptySet());
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any());
@ -316,7 +316,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testConfigUpdate_singleConsumer() {
queue.setConsumerPerPartition(false);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
TestConsumer consumer = getConsumer();
@ -342,7 +341,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testConfigUpdate_consumerPerPartition() {
queue.setConsumerPerPartition(true);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
TestConsumer consumer1 = getConsumer(1);
@ -375,7 +373,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testConfigUpdate_fromSingleToConsumerPerPartition() {
queue.setConsumerPerPartition(false);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
TestConsumer consumer = getConsumer();
@ -395,7 +392,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testConfigUpdate_fromConsumerPerPartitionToSingle() {
queue.setConsumerPerPartition(true);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3);
consumerManager.update(partitions);
TestConsumer consumer1 = getConsumer(1);
@ -419,7 +415,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testStop() {
queue.setConsumerPerPartition(true);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
consumerManager.update(createTpis(1));
TestConsumer consumer = getConsumer(1);
verifySubscribedAndLaunched(consumer, 1);
@ -437,7 +432,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testDelete_consumerPerPartition() {
queue.setConsumerPerPartition(true);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2);
consumerManager.update(partitions);
TestConsumer consumer1 = getConsumer(1);
@ -459,7 +453,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
int msgCount = totalConsumedMsgs.get();
await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS)
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
@ -481,7 +475,6 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testDelete_singleConsumer() {
queue.setConsumerPerPartition(false);
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Set<TopicPartitionInfo> partitions = createTpis(1, 2);
consumerManager.update(partitions);
TestConsumer consumer = getConsumer();
@ -499,7 +492,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
int msgCount = totalConsumedMsgs.get();
await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS)
.atMost(20, TimeUnit.SECONDS)
.untilAsserted(() -> {
partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
@ -520,10 +513,9 @@ public class TbRuleEngineQueueConsumerManagerTest {
public void testManyDifferentUpdates() throws Exception {
queue.setConsumerPerPartition(RandomUtils.nextBoolean());
consumerManager.init(queue);
ruleEngineConsumerContext.setReady(true);
Supplier<Queue> queueConfigUpdater = () -> {
Queue oldConfig = consumerManager.getQueue();
Queue oldConfig = consumerManager.getConfig();
Queue newConfig = JacksonUtil.clone(oldConfig);
newConfig.setConsumerPerPartition(RandomUtils.nextBoolean());
newConfig.setPollInterval(RandomUtils.nextInt(100, 501));
@ -571,7 +563,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
Set<TopicPartitionInfo> expectedPartitions = latestPartitions;
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(consumerManager.getQueue()).isEqualTo(expectedConfig);
assertThat(consumerManager.getConfig()).isEqualTo(expectedConfig);
assertThat(consumerManager.getPartitions()).isEqualTo(expectedPartitions);
});

View File

@ -39,4 +39,5 @@ public class OtherConfiguration extends PowerSavingConfiguration {
private Long pagingTransmissionWindow;
private String fwUpdateResource;
private String swUpdateResource;
private String defaultObjectIDVer;
}

View File

@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.validation.NoXss;
import java.util.Optional;
@Data
public class Queue extends BaseDataWithAdditionalInfo<QueueId> implements HasName, HasTenantId {
public class Queue extends BaseDataWithAdditionalInfo<QueueId> implements HasName, HasTenantId, QueueConfig {
private TenantId tenantId;
@NoXss
@Length(fieldName = "name")

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.queue;
public interface QueueConfig {
boolean isConsumerPerPartition();
int getPollInterval();
}

View File

@ -15,11 +15,23 @@
*/
package org.thingsboard.server.common.msg.queue;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
@Getter
public enum ServiceType {
TB_CORE, TB_RULE_ENGINE, TB_TRANSPORT, JS_EXECUTOR, TB_VC_EXECUTOR;
TB_CORE("TB Core"),
TB_RULE_ENGINE("TB Rule Engine"),
TB_TRANSPORT("TB Transport"),
JS_EXECUTOR("JS Executor"),
TB_VC_EXECUTOR("TB VC Executor");
private final String label;
public static ServiceType of(String serviceType) {
return ServiceType.valueOf(serviceType.replace("-", "_").toUpperCase());
}
}

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue.consumer;
package org.thingsboard.server.queue.common.consumer;
import lombok.Builder;
import lombok.Getter;
@ -106,4 +106,5 @@ public class QueueConsumerManager<M extends TbQueueMsg> {
public interface MsgPackProcessor<M extends TbQueueMsg> {
void process(List<M> msgs, TbQueueConsumer<M> consumer) throws Exception;
}
}
}

View File

@ -217,7 +217,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getTopic()));
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("monolith-core-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-core-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);

View File

@ -56,6 +56,7 @@ import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
@Component
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'")
@ -84,6 +85,8 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbQueueAdmin housekeeperAdmin;
private final TbQueueAdmin housekeeperReprocessingAdmin;
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbCoreQueueFactory(TopicService topicService,
TbKafkaSettings kafkaSettings,
TbServiceInfoProvider serviceInfoProvider,
@ -174,7 +177,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToCoreMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName(coreSettings.getTopic()));
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("tb-core-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-node"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToCoreMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(coreAdmin);

View File

@ -29,6 +29,7 @@ import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationListener;
import org.eclipse.leshan.server.registration.RegistrationUpdate;
import org.eclipse.leshan.server.send.SendListener;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler;
import java.util.Collection;
@ -101,7 +102,10 @@ public class LwM2mServerListener {
@Override
public void onResponse(SingleObservation observation, Registration registration, ObserveResponse response) {
if (registration != null) {
service.onUpdateValueAfterReadResponse(registration, convertObjectIdToVersionedId(observation.getPath().toString(), registration), response);
LwM2mClient lwM2MClient = service.getClientContext().getClientByEndpoint(registration.getEndpoint());
if (lwM2MClient != null) {
service.onUpdateValueAfterReadResponse(registration, convertObjectIdToVersionedId(observation.getPath().toString(), lwM2MClient), response);
}
}
}

View File

@ -27,10 +27,10 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.queue.util.TbLwM2mTransportComponent;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
@ -107,7 +107,8 @@ public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
@Override
public ObjectModel getObjectModel(int objectId) {
String version = String.valueOf(registration.getSupportedVersion(objectId));
LwM2mClient lwM2mClient = lwM2mClientContext.getClientByEndpoint(registration.getEndpoint());
String version = lwM2mClient.getSupportedObjectVersion(objectId).toString();
if (version != null) {
return this.getObjectModelDynamic(objectId, version);
}
@ -116,7 +117,8 @@ public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
@Override
public Collection<ObjectModel> getObjectModels() {
Map<Integer, LwM2m.Version> supportedObjects = this.registration.getSupportedObject();
LwM2mClient lwM2mClient = lwM2mClientContext.getClientByEndpoint(registration.getEndpoint());
Map<Integer, LwM2m.Version> supportedObjects = lwM2mClient.getSupportedClientObjects();
Collection<ObjectModel> result = new ArrayList<>(supportedObjects.size());
for (Map.Entry<Integer, LwM2m.Version> supportedObject : supportedObjects.entrySet()) {
ObjectModel objectModel = this.getObjectModelDynamic(supportedObject.getKey(), String.valueOf(supportedObject.getValue()));

View File

@ -21,6 +21,8 @@ import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.leshan.core.LwM2m;
import org.eclipse.leshan.core.LwM2m.Version;
import org.eclipse.leshan.core.link.Link;
import org.eclipse.leshan.core.link.attributes.Attribute;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
@ -37,6 +39,7 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.device.profile.Lwm2mDeviceProfileTransportConfiguration;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
@ -61,8 +64,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.thingsboard.server.common.data.lwm2m.LwM2mConstants.LWM2M_SEPARATOR_PATH;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.LWM2M_OBJECT_VERSION_DEFAULT;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.convertMultiResourceValuesFromRpcBody;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.convertObjectIdToVersionedId;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.equalsResourceTypeGetSimpleName;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.fromVersionedIdToObjectId;
import static org.thingsboard.server.transport.lwm2m.utils.LwM2MTransportUtil.getVerFromPathIdVerOrId;
@ -129,6 +132,12 @@ public class LwM2mClient {
@Setter
private UUID lastSentRpcId;
@Setter
private LwM2m.Version defaultObjectIDVer;
@Getter
private Map<Integer, Version> supportedClientObjects;
public Object clone() throws CloneNotSupportedException {
return super.clone();
}
@ -142,6 +151,7 @@ public class LwM2mClient {
this.state = LwM2MClientState.CREATED;
this.lock = new ReentrantLock();
this.retryAttempts = new AtomicInteger(0);
this.supportedClientObjects = null;
}
public void init(ValidateDeviceCredentialsResponse credentials, UUID sessionId) {
@ -153,12 +163,14 @@ public class LwM2mClient {
this.edrxCycle = credentials.getDeviceInfo().getEdrxCycle();
this.psmActivityTimer = credentials.getDeviceInfo().getPsmActivityTimer();
this.pagingTransmissionWindow = credentials.getDeviceInfo().getPagingTransmissionWindow();
this.defaultObjectIDVer = getObjectIDVerFromDeviceProfile(credentials.getDeviceProfile());
}
public void setRegistration(Registration registration) {
this.registration = registration;
this.clientSupportContentFormats = clientSupportContentFormat(registration);
this.defaultContentFormat = calculateDefaultContentFormat(registration);
this.setSupportedClientObjects();
}
public void lock() {
@ -197,6 +209,18 @@ public class LwM2mClient {
builder.setDeviceType(deviceProfile.getName());
}
private LwM2m.Version getObjectIDVerFromDeviceProfile(DeviceProfile deviceProfile) {
String defaultObjectIdVer = null;
if (deviceProfile != null) {
defaultObjectIdVer = ((Lwm2mDeviceProfileTransportConfiguration) deviceProfile
.getProfileData()
.getTransportConfiguration())
.getClientLwM2mSettings()
.getDefaultObjectIDVer();
}
return new Version(defaultObjectIdVer == null ? LWM2M_OBJECT_VERSION_DEFAULT : defaultObjectIdVer);
}
public void refreshSessionId(String nodeId) {
UUID newId = UUID.randomUUID();
SessionInfoProto.Builder builder = SessionInfoProto.newBuilder().mergeFrom(session);
@ -240,67 +264,28 @@ public class LwM2mClient {
}
}
public Object getResourceValue(String pathRezIdVer, String pathRezId) {
String pathRez = pathRezIdVer == null ? convertObjectIdToVersionedId(pathRezId, this.registration) : pathRezIdVer;
if (this.resources.get(pathRez) != null) {
return this.resources.get(pathRez).getLwM2mResource().getValue();
}
return null;
}
public Object getResourceNameByRezId(String pathRezIdVer, String pathRezId) {
String pathRez = pathRezIdVer == null ? convertObjectIdToVersionedId(pathRezId, this.registration) : pathRezIdVer;
if (this.resources.get(pathRez) != null) {
return this.resources.get(pathRez).getResourceModel().name;
}
return null;
}
public String getRezIdByResourceNameAndObjectInstanceId(String resourceName, String pathObjectInstanceIdVer, LwM2mModelProvider modelProvider) {
LwM2mPath pathIds = getLwM2mPathFromString(pathObjectInstanceIdVer);
if (pathIds.isObjectInstance()) {
Set<Integer> rezIds = modelProvider.getObjectModel(registration)
.getObjectModel(pathIds.getObjectId()).resources.entrySet()
.stream()
.filter(map -> resourceName.equals(map.getValue().name))
.map(map -> map.getKey())
.collect(Collectors.toSet());
return rezIds.size() > 0 ? String.valueOf(rezIds.stream().findFirst().get()) : null;
}
return null;
}
public ResourceModel getResourceModel(String pathIdVer, LwM2mModelProvider modelProvider) {
LwM2mPath pathIds = getLwM2mPathFromString(pathIdVer);
String verSupportedObject = String.valueOf(registration.getSupportedObject().get(pathIds.getObjectId()));
String verSupportedObject = String.valueOf(this.getSupportedObjectVersion(pathIds.getObjectId()));
String verRez = getVerFromPathIdVerOrId(pathIdVer);
return verRez != null && verRez.equals(verSupportedObject) ? modelProvider.getObjectModel(registration)
.getResourceModel(pathIds.getObjectId(), pathIds.getResourceId()) : null;
}
public boolean isResourceMultiInstances(String pathIdVer, LwM2mModelProvider modelProvider) {
var resourceModel = getResourceModel(pathIdVer, modelProvider);
if (resourceModel != null && resourceModel.multiple != null) {
return resourceModel.multiple;
} else {
return false;
}
}
public ObjectModel getObjectModel(String pathIdVer, LwM2mModelProvider modelProvider) {
try {
LwM2mPath pathIds = getLwM2mPathFromString(pathIdVer);
String verSupportedObject = String.valueOf(registration.getSupportedObject().get(pathIds.getObjectId()));
String verSupportedObject = String.valueOf(this.getSupportedObjectVersion(pathIds.getObjectId()));
String verRez = getVerFromPathIdVerOrId(pathIdVer);
return verRez != null && verRez.equals(verSupportedObject) ? modelProvider.getObjectModel(registration)
.getObjectModel(pathIds.getObjectId()) : null;
} catch (Exception e) {
if (registration == null) {
log.error("[{}] Failed Registration is null, GetObjectModelRegistration. ", this.endpoint, e);
} else if (registration.getSupportedObject() == null) {
} else if (this.getSupportedClientObjects() == null) {
log.error("[{}] Failed SupportedObject in Registration, GetObjectModelRegistration.", this.endpoint, e);
} else {
log.error("[{}] Failed ModelProvider.getObjectModel [{}] in Registration. ", this.endpoint, registration.getSupportedObject(), e);
log.error("[{}] Failed ModelProvider.getObjectModel [{}] in Registration. ", this.endpoint, this.getSupportedClientObjects(), e);
}
return null;
}
@ -371,7 +356,7 @@ public class LwM2mClient {
public String isValidObjectVersion(String path) {
LwM2mPath pathIds = getLwM2mPathFromString(path);
LwM2m.Version verSupportedObject = registration.getSupportedObject().get(pathIds.getObjectId());
LwM2m.Version verSupportedObject = this.getSupportedObjectVersion(pathIds.getObjectId());
if (verSupportedObject == null) {
return String.format("Specified object id %s absent in the list supported objects of the client or is security object!", pathIds.getObjectId());
} else {
@ -460,5 +445,31 @@ public class LwM2mClient {
return new LwM2mPath(fromVersionedIdToObjectId(path));
}
public LwM2m.Version getDefaultObjectIDVer() {
return this.defaultObjectIDVer == null ? new Version(LWM2M_OBJECT_VERSION_DEFAULT) : this.defaultObjectIDVer;
}
public LwM2m.Version getSupportedObjectVersion(Integer objectid) {
return this.supportedClientObjects.get(objectid);
}
private void setSupportedClientObjects(){
this.supportedClientObjects = new ConcurrentHashMap<>();
for (Link link: this.registration.getSortedObjectLinks()) {
LwM2mPath lwM2mPath = new LwM2mPath(link.getUriReference());
if (lwM2mPath.isObject()) {
LwM2m.Version ver;
if (link.getAttributes().get("ver")!= null) {
ver = (Version) link.getAttributes().get("ver").getValue();
} else {
ver = getDefaultObjectIDVer();
}
this.supportedClientObjects.put(lwM2mPath.getObjectId(), ver);
} else if (lwM2mPath.getObjectId() != null && this.supportedClientObjects.get(lwM2mPath.getObjectId()) == null){
this.supportedClientObjects.put(lwM2mPath.getObjectId(), getDefaultObjectIDVer());
}
}
}
}

View File

@ -396,7 +396,7 @@ public class LwM2mClientContextImpl implements LwM2mClientContext {
Arrays.stream(client.getRegistration().getObjectLinks()).forEach(link -> {
LwM2mPath pathIds = new LwM2mPath(link.getUriReference());
if (!pathIds.isRoot()) {
clientObjects.add(convertObjectIdToVersionedId(link.getUriReference(), client.getRegistration()));
clientObjects.add(convertObjectIdToVersionedId(link.getUriReference(), client));
}
});
return (clientObjects.size() > 0) ? clientObjects : null;

View File

@ -481,7 +481,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
}
private void startUpdateUsingUrl(LwM2mClient client, String id, String url) {
String targetIdVer = convertObjectIdToVersionedId(id, client.getRegistration());
String targetIdVer = convertObjectIdToVersionedId(id, client);
TbLwM2MWriteReplaceRequest request = TbLwM2MWriteReplaceRequest.builder().versionedId(targetIdVer).value(url).timeout(clientContext.getRequestTimeout(client)).build();
downlinkHandler.sendWriteReplaceRequest(client, request, new TbLwM2MWriteResponseCallback(uplinkHandler, logService, client, targetIdVer));
}
@ -512,10 +512,10 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
}
switch (strategy) {
case OBJ_5_BINARY:
startUpdateUsingBinary(client, convertObjectIdToVersionedId(FW_PACKAGE_5_ID, client.getRegistration()), otaPackageId);
startUpdateUsingBinary(client, convertObjectIdToVersionedId(FW_PACKAGE_5_ID, client), otaPackageId);
break;
case OBJ_19_BINARY:
startUpdateUsingBinary(client, convertObjectIdToVersionedId(FW_PACKAGE_19_ID, client.getRegistration()), otaPackageId);
startUpdateUsingBinary(client, convertObjectIdToVersionedId(FW_PACKAGE_19_ID, client), otaPackageId);
break;
case OBJ_5_TEMP_URL:
startUpdateUsingUrl(client, FW_URL_ID, info.getBaseUrl() + "/" + FIRMWARE_UPDATE_COAP_RESOURCE + "/" + otaPackageId.toString());
@ -534,7 +534,7 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
LwM2MSoftwareUpdateStrategy strategy = info.getStrategy();
switch (strategy) {
case BINARY:
startUpdateUsingBinary(client, convertObjectIdToVersionedId(SW_PACKAGE_ID, client.getRegistration()), otaPackageId);
startUpdateUsingBinary(client, convertObjectIdToVersionedId(SW_PACKAGE_ID, client), otaPackageId);
break;
case TEMP_URL:
startUpdateUsingUrl(client, SW_PACKAGE_URI_ID, info.getBaseUrl() + "/" + FIRMWARE_UPDATE_COAP_RESOURCE + "/" + otaPackageId.toString());
@ -566,19 +566,19 @@ public class DefaultLwM2MOtaUpdateService extends LwM2MExecutorAwareService impl
}
private void executeFwUpdate(LwM2mClient client) {
String fwExecuteVerId = convertObjectIdToVersionedId(FW_EXECUTE_ID, client.getRegistration());
String fwExecuteVerId = convertObjectIdToVersionedId(FW_EXECUTE_ID, client);
TbLwM2MExecuteRequest request = TbLwM2MExecuteRequest.builder().versionedId(fwExecuteVerId).timeout(clientContext.getRequestTimeout(client)).build();
downlinkHandler.sendExecuteRequest(client, request, new TbLwM2MExecuteCallback(logService, client, fwExecuteVerId));
}
private void executeSwInstall(LwM2mClient client) {
String swInstallVerId = convertObjectIdToVersionedId(SW_INSTALL_ID, client.getRegistration());
String swInstallVerId = convertObjectIdToVersionedId(SW_INSTALL_ID, client);
TbLwM2MExecuteRequest request = TbLwM2MExecuteRequest.builder().versionedId(swInstallVerId).timeout(clientContext.getRequestTimeout(client)).build();
downlinkHandler.sendExecuteRequest(client, request, new TbLwM2MExecuteCallback(logService, client, swInstallVerId));
}
private void executeSwUninstallForUpdate(LwM2mClient client) {
String swInInstallVerId = convertObjectIdToVersionedId(SW_UN_INSTALL_ID, client.getRegistration());
String swInInstallVerId = convertObjectIdToVersionedId(SW_UN_INSTALL_ID, client);
TbLwM2MExecuteRequest request = TbLwM2MExecuteRequest.builder().versionedId(swInInstallVerId).params("1").timeout(clientContext.getRequestTimeout(client)).build();
downlinkHandler.sendExecuteRequest(client, request, new TbLwM2MExecuteCallback(logService, client, swInInstallVerId));
}

View File

@ -23,6 +23,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.util.JsonFormat;
import lombok.SneakyThrows;
import org.eclipse.leshan.core.LwM2m.Version;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mMultipleResource;
import org.eclipse.leshan.core.node.LwM2mNodeException;
@ -107,6 +108,8 @@ public class LwM2MClientSerDes {
if (client.getPagingTransmissionWindow() != null) {
o.addProperty("pagingTransmissionWindow", client.getPagingTransmissionWindow());
}
o.addProperty("defaultObjectIDVer", client.getDefaultObjectIDVer().toString());
if (client.getRegistration() != null) {
String registrationAddress = client.getRegistration().getAddress().toString();
JsonNode registrationNode = registrationSerDes.jSerialize(client.getRegistration());
@ -339,6 +342,13 @@ public class LwM2MClientSerDes {
pagingTransmissionWindowField.set(lwM2mClient, pagingTransmissionWindow.getAsLong());
}
JsonElement defaultObjectIDVer = o.get("defaultObjectIDVer");
if (defaultObjectIDVer != null) {
Field defaultObjectIDVerField = lwM2mClientClass.getDeclaredField("defaultObjectIDVer");
defaultObjectIDVerField.setAccessible(true);
defaultObjectIDVerField.set(lwM2mClient, new Version(defaultObjectIDVer.getAsString()));
}
JsonElement registration = o.get("registration");
if (registration != null) {
lwM2mClient.setRegistration(registrationSerDes.deserialize(toJsonNode(registration.getAsString())));

View File

@ -368,7 +368,7 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
LwM2mPath path = entry.getKey();
LwM2mNode node = entry.getValue();
LwM2mClient lwM2MClient = clientContext.getClientByEndpoint(registration.getEndpoint());
String stringPath = convertObjectIdToVersionedId(path.toString(), registration);
String stringPath = convertObjectIdToVersionedId(path.toString(), lwM2MClient);
ObjectModel objectModelVersion = lwM2MClient.getObjectModel(stringPath, modelProvider);
if (objectModelVersion != null) {
if (node instanceof LwM2mObject) {
@ -584,27 +584,27 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
private void updateResourcesValue(LwM2mClient lwM2MClient, LwM2mResource lwM2mResource, String path, Mode mode, int code) {
Registration registration = lwM2MClient.getRegistration();
if (lwM2MClient.saveResourceValue(path, lwM2mResource, modelProvider, mode)) {
if (path.equals(convertObjectIdToVersionedId(FW_NAME_ID, registration))) {
if (path.equals(convertObjectIdToVersionedId(FW_NAME_ID, lwM2MClient))) {
otaService.onCurrentFirmwareNameUpdate(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_3_VER_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(FW_3_VER_ID, lwM2MClient))) {
otaService.onCurrentFirmwareVersion3Update(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_VER_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(FW_VER_ID, lwM2MClient))) {
otaService.onCurrentFirmwareVersionUpdate(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_STATE_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(FW_STATE_ID, lwM2MClient))) {
otaService.onCurrentFirmwareStateUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_RESULT_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(FW_RESULT_ID, lwM2MClient))) {
otaService.onCurrentFirmwareResultUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(FW_DELIVERY_METHOD, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(FW_DELIVERY_METHOD, lwM2MClient))) {
otaService.onCurrentFirmwareDeliveryMethodUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(SW_NAME_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(SW_NAME_ID, lwM2MClient))) {
otaService.onCurrentSoftwareNameUpdate(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(SW_VER_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(SW_VER_ID, lwM2MClient))) {
otaService.onCurrentSoftwareVersionUpdate(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(SW_3_VER_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(SW_3_VER_ID, lwM2MClient))) {
otaService.onCurrentSoftwareVersion3Update(lwM2MClient, (String) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(SW_STATE_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(SW_STATE_ID, lwM2MClient))) {
otaService.onCurrentSoftwareStateUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
} else if (path.equals(convertObjectIdToVersionedId(SW_RESULT_ID, registration))) {
} else if (path.equals(convertObjectIdToVersionedId(SW_RESULT_ID, lwM2MClient))) {
otaService.onCurrentSoftwareResultUpdate(lwM2MClient, (Long) lwM2mResource.getValue());
}
if (ResponseCode.BAD_REQUEST.getCode() > code) {
@ -969,6 +969,10 @@ public class DefaultLwM2mUplinkMsgHandler extends LwM2MExecutorAwareService impl
}
}
public LwM2mClientContext getClientContext(){
return this.clientContext;
};
private Map<String, String> getNamesFromProfileForSharedAttributes(LwM2mClient lwM2MClient) {
Lwm2mDeviceProfileTransportConfiguration profile = clientContext.getProfile(lwM2MClient.getProfileId());
return profile.getObserveAttr().getKeyName();

View File

@ -30,6 +30,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClientContext;
import java.util.Collection;
import java.util.Optional;
@ -77,4 +78,5 @@ public interface LwM2mUplinkMsgHandler {
LwM2mValueConverter getConverter();
LwM2mClientContext getClientContext();
}

View File

@ -20,16 +20,13 @@ import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.elements.config.Configuration;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.model.ObjectLoader;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.model.StaticModel;
import org.eclipse.leshan.core.node.LwM2mMultipleResource;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.LwM2mResource;
import org.eclipse.leshan.core.node.LwM2mSingleResource;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.util.Hex;
import org.eclipse.leshan.server.registration.Registration;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceTransportType;
@ -49,8 +46,6 @@ import org.thingsboard.server.transport.lwm2m.server.ota.firmware.FirmwareUpdate
import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateResult;
import org.thingsboard.server.transport.lwm2m.server.ota.software.SoftwareUpdateState;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -84,56 +79,6 @@ public class LwM2MTransportUtil {
public static final String LOG_LWM2M_WARN = "warn";
public static final int BOOTSTRAP_DEFAULT_SHORT_ID_0 = 0;
public enum LwM2MClientStrategy {
CLIENT_STRATEGY_1(1, "Read only resources marked as observation"),
CLIENT_STRATEGY_2(2, "Read all client resources");
public int code;
public String type;
LwM2MClientStrategy(int code, String type) {
this.code = code;
this.type = type;
}
public static LwM2MClientStrategy fromStrategyClientByType(String type) {
for (LwM2MClientStrategy to : LwM2MClientStrategy.values()) {
if (to.type.equals(type)) {
return to;
}
}
throw new IllegalArgumentException(String.format("Unsupported Client Strategy type : %s", type));
}
public static LwM2MClientStrategy fromStrategyClientByCode(int code) {
for (LwM2MClientStrategy to : LwM2MClientStrategy.values()) {
if (to.code == code) {
return to;
}
}
throw new IllegalArgumentException(String.format("Unsupported Client Strategy code : %s", code));
}
}
public static boolean equalsResourceValue(Object valueOld, Object valueNew, ResourceModel.Type type, LwM2mPath
resourcePath) throws CodecException {
switch (type) {
case BOOLEAN:
case INTEGER:
case FLOAT:
return String.valueOf(valueOld).equals(String.valueOf(valueNew));
case TIME:
return ((Date) valueOld).getTime() == ((Date) valueNew).getTime();
case STRING:
case OBJLNK:
return valueOld.equals(valueNew);
case OPAQUE:
return Arrays.equals(Hex.decodeHex(((String) valueOld).toCharArray()), Hex.decodeHex(((String) valueNew).toCharArray()));
default:
throw new CodecException("Invalid value type for resource %s, type %s", resourcePath, type);
}
}
public static LwM2mOtaConvert convertOtaUpdateValueToString(String pathIdVer, Object value, ResourceModel.Type currentType) {
String path = fromVersionedIdToObjectId(pathIdVer);
LwM2mOtaConvert lwM2mOtaConvert = new LwM2mOtaConvert();
@ -210,22 +155,8 @@ public class LwM2MTransportUtil {
return null;
}
public static String validPathIdVer(String pathIdVer, Registration registration) throws
IllegalArgumentException {
if (!pathIdVer.contains(LWM2M_SEPARATOR_PATH)) {
throw new IllegalArgumentException(String.format("Error:"));
} else {
String[] keyArray = pathIdVer.split(LWM2M_SEPARATOR_PATH);
if (keyArray.length > 1 && keyArray[1].split(LWM2M_SEPARATOR_KEY).length == 2) {
return pathIdVer;
} else {
return convertObjectIdToVersionedId(pathIdVer, registration);
}
}
}
public static String convertObjectIdToVersionedId(String path, Registration registration) {
String ver = String.valueOf(registration.getSupportedObject().get(new LwM2mPath(path).getObjectId()));
public static String convertObjectIdToVersionedId(String path, LwM2mClient lwM2MClient) {
String ver = String.valueOf(lwM2MClient.getSupportedObjectVersion(new LwM2mPath(path).getObjectId()));
return convertObjectIdToVerId(path, ver);
}
public static String convertObjectIdToVerId(String path, String ver) {
@ -243,14 +174,6 @@ public class LwM2MTransportUtil {
}
}
public static String validateObjectVerFromKey(String key) {
try {
return (key.split(LWM2M_SEPARATOR_PATH)[1].split(LWM2M_SEPARATOR_KEY)[1]);
} catch (Exception e) {
return ObjectModel.DEFAULT_VERSION;
}
}
/**
* "UNSIGNED_INTEGER": // Number -> Integer Example:
* Alarm Timestamp [32-bit unsigned integer]

View File

@ -21,6 +21,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -97,6 +99,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
@ -106,14 +109,12 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbTransportComponent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@ -179,19 +180,17 @@ public class DefaultTransportService extends TransportActivityManager implements
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer;
protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
protected QueueConsumerManager<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
protected MessagesStats ruleEngineProducerStats;
protected MessagesStats tbCoreProducerStats;
protected MessagesStats transportApiStats;
protected ExecutorService transportCallbackExecutor;
private ExecutorService mainConsumerExecutor;
private ExecutorService consumerExecutor;
private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>();
private volatile boolean stopped = false;
public DefaultTransportService(PartitionService partitionService,
TbServiceInfoProvider serviceInfoProvider,
TbTransportQueueFactory queueProvider,
@ -232,42 +231,33 @@ public class DefaultTransportService extends TransportActivityManager implements
transportApiRequestTemplate.setMessagesStats(transportApiStats);
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
transportApiRequestTemplate.init();
mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
transportNotificationsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToTransportMsg>>builder()
.name("TB Transport")
.msgPackProcessor(this::processNotificationMsgs)
.pollInterval(notificationsPollDuration)
.consumerCreator(queueProvider::createTransportNotificationsConsumer)
.consumerExecutor(consumerExecutor)
.build();
}
@AfterStartUp(order = AfterStartUp.TRANSPORT_SERVICE)
public void start() {
mainConsumerExecutor.execute(() -> {
while (!stopped) {
try {
List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
if (records.size() == 0) {
continue;
}
records.forEach(record -> {
try {
processToTransportMsg(record.getValue());
} catch (Throwable e) {
log.warn("Failed to process the notification.", e);
}
});
transportNotificationsConsumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain messages from queue.", e);
try {
Thread.sleep(notificationsPollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new requests", e2);
}
}
}
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
transportNotificationsConsumer.subscribe(Set.of(tpi));
transportNotificationsConsumer.launch();
}
private void processNotificationMsgs(List<TbProtoQueueMsg<ToTransportMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> consumer) {
msgs.forEach(msg -> {
try {
processToTransportMsg(msg.getValue());
} catch (Throwable e) {
log.warn("Failed to process the notification.", e);
}
});
consumer.commit();
}
private void invalidateRateLimits() {
@ -276,16 +266,14 @@ public class DefaultTransportService extends TransportActivityManager implements
@PreDestroy
public void destroy() {
stopped = true;
if (transportNotificationsConsumer != null) {
transportNotificationsConsumer.unsubscribe();
transportNotificationsConsumer.stop();
}
if (transportCallbackExecutor != null) {
transportCallbackExecutor.shutdownNow();
}
if (mainConsumerExecutor != null) {
mainConsumerExecutor.shutdownNow();
if (consumerExecutor != null) {
consumerExecutor.shutdownNow();
}
if (transportApiRequestTemplate != null) {
transportApiRequestTemplate.stop();

View File

@ -21,14 +21,13 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.eclipse.jgit.errors.LargeObjectException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
@ -68,16 +67,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionedEntityInfoP
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbVersionControlComponent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -115,9 +114,8 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private final Map<TenantId, PendingCommit> pendingCommitMap = new HashMap<>();
private volatile ExecutorService consumerExecutor;
private volatile TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer;
private volatile QueueConsumerManager<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer;
private volatile TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> producer;
private volatile boolean stopped = false;
@Value("${queue.vc.poll-interval:25}")
private long pollDuration;
@ -134,20 +132,25 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
@PostConstruct
public void init() {
consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("vc-consumer"));
consumerExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("vc-consumer"));
var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread");
for (int i = 0; i < ioPoolSize; i++) {
ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory)));
}
producer = producerProvider.getTbCoreNotificationsMsgProducer();
consumer = queueFactory.createToVersionControlMsgConsumer();
consumer = QueueConsumerManager.<TbProtoQueueMsg<ToVersionControlServiceMsg>>builder()
.name("TB Version Control")
.msgPackProcessor(this::processMsgs)
.pollInterval(pollDuration)
.consumerCreator(queueFactory::createToVersionControlMsgConsumer)
.consumerExecutor(consumerExecutor)
.build();
}
@PreDestroy
public void stop() {
stopped = true;
if (consumer != null) {
consumer.unsubscribe();
consumer.stop();
}
if (consumerExecutor != null) {
consumerExecutor.shutdownNow();
@ -179,48 +182,29 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
return ServiceType.TB_VC_EXECUTOR.equals(event.getServiceType());
}
@EventListener(ApplicationReadyEvent.class)
@Order(value = 2)
public void onApplicationEvent(ApplicationReadyEvent event) {
consumerExecutor.execute(() -> consumerLoop(consumer));
@AfterStartUp(order = 2)
public void afterStartUp() {
consumer.launch();
}
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer) {
while (!stopped && !consumer.isStopped()) {
List<ListenableFuture<?>> futures = new ArrayList<>();
try {
List<TbProtoQueueMsg<ToVersionControlServiceMsg>> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
continue;
}
for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) {
ToVersionControlServiceMsg msg = msgWrapper.getValue();
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings()));
long startTs = System.currentTimeMillis();
log.trace("[{}][{}] RECEIVED task: {}", ctx.getTenantId(), ctx.getRequestId(), msg);
int threadIdx = Math.abs(ctx.getTenantId().hashCode() % ioPoolSize);
ListenableFuture<Void> future = ioThreads.get(threadIdx).submit(() -> processMessage(ctx, msg));
logTaskExecution(ctx, future, startTs);
futures.add(future);
}
try {
Futures.allAsList(futures).get(packProcessingTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.info("Timeout for processing the version control tasks.", e);
}
consumer.commit();
} catch (Exception e) {
if (!stopped) {
log.warn("Failed to obtain version control requests from queue.", e);
try {
Thread.sleep(pollDuration);
} catch (InterruptedException e2) {
log.trace("Failed to wait until the server has capacity to handle new version control messages", e2);
}
}
}
void processMsgs(List<TbProtoQueueMsg<ToVersionControlServiceMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer) throws Exception {
List<ListenableFuture<?>> futures = new ArrayList<>();
for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) {
ToVersionControlServiceMsg msg = msgWrapper.getValue();
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings()));
long startTs = System.currentTimeMillis();
log.trace("[{}][{}] RECEIVED task: {}", ctx.getTenantId(), ctx.getRequestId(), msg);
int threadIdx = Math.abs(ctx.getTenantId().hashCode() % ioPoolSize);
ListenableFuture<Void> future = ioThreads.get(threadIdx).submit(() -> processMessage(ctx, msg));
logTaskExecution(ctx, future, startTs);
futures.add(future);
}
log.info("TB Version Control request consumer stopped.");
try {
Futures.allAsList(futures).get(packProcessingTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.info("Timeout for processing the version control tasks.", e);
}
consumer.commit();
}
private Void processMessage(VersionControlRequestCtx ctx, ToVersionControlServiceMsg msg) {
@ -273,7 +257,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
var ids = vcService.listEntitiesAtVersion(ctx.getTenantId(), request.getVersionId(), path)
.stream().skip(request.getOffset()).limit(request.getLimit()).collect(Collectors.toList());
if (!ids.isEmpty()) {
for (int i = 0; i < ids.size(); i++){
for (int i = 0; i < ids.size(); i++) {
VersionedEntityInfo info = ids.get(i);
var data = vcService.getFileContentAtCommit(ctx.getTenantId(),
getRelativePath(info.getExternalId().getEntityType(), info.getExternalId().getId().toString()), request.getVersionId());

View File

@ -12,14 +12,14 @@
"transportConfiguration": {
"observeAttr": {
"observe": [
"/3_1.1/0/0"
"/3_1.0/0/0"
],
"attribute": [],
"telemetry": [
"/3_1.1/0/0"
"/3_1.0/0/0"
],
"keyName": {
"/3_1.1/0/0": "testData"
"/3_1.0/0/0": "testData"
},
"attributeLwm2m": {}
},
@ -44,8 +44,7 @@
"clientOnlyObserveAfterConnect": 1,
"fwUpdateStrategy": 1,
"swUpdateStrategy": 1,
"powerMode": "DRX",
"compositeOperationsSupport": false
"powerMode": "DRX"
},
"bootstrapServerUpdateEnable": false,
"type": "LWM2M"

View File

@ -23,9 +23,9 @@
<Description1>
<![CDATA[]]></Description1>
<ObjectID>3</ObjectID>
<ObjectURN>urn:oma:lwm2m:oma:3:1.1</ObjectURN>
<ObjectURN>urn:oma:lwm2m:oma:3</ObjectURN>
<LWM2MVersion>1.1</LWM2MVersion>
<ObjectVersion>1.1</ObjectVersion>
<ObjectVersion>1.0</ObjectVersion>
<MultipleInstances>Single</MultipleInstances>
<Mandatory>Mandatory</Mandatory>
<Resources>

View File

@ -2,5 +2,5 @@
"title": "",
"resourceType": "LWM2M_MODEL",
"fileName": "test-model.xml",
"data": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPCEtLQoKICAgIENvcHlyaWdodCDCqSAyMDE2LTIwMjIgVGhlIFRoaW5nc2JvYXJkIEF1dGhvcnMKCiAgICBMaWNlbnNlZCB1bmRlciB0aGUgQXBhY2hlIExpY2Vuc2UsIFZlcnNpb24gMi4wICh0aGUgIkxpY2Vuc2UiKTsKICAgIHlvdSBtYXkgbm90IHVzZSB0aGlzIGZpbGUgZXhjZXB0IGluIGNvbXBsaWFuY2Ugd2l0aCB0aGUgTGljZW5zZS4KICAgIFlvdSBtYXkgb2J0YWluIGEgY29weSBvZiB0aGUgTGljZW5zZSBhdAoKICAgICAgICBodHRwOi8vd3d3LmFwYWNoZS5vcmcvbGljZW5zZXMvTElDRU5TRS0yLjAKCiAgICBVbmxlc3MgcmVxdWlyZWQgYnkgYXBwbGljYWJsZSBsYXcgb3IgYWdyZWVkIHRvIGluIHdyaXRpbmcsIHNvZnR3YXJlCiAgICBkaXN0cmlidXRlZCB1bmRlciB0aGUgTGljZW5zZSBpcyBkaXN0cmlidXRlZCBvbiBhbiAiQVMgSVMiIEJBU0lTLAogICAgV0lUSE9VVCBXQVJSQU5USUVTIE9SIENPTkRJVElPTlMgT0YgQU5ZIEtJTkQsIGVpdGhlciBleHByZXNzIG9yIGltcGxpZWQuCiAgICBTZWUgdGhlIExpY2Vuc2UgZm9yIHRoZSBzcGVjaWZpYyBsYW5ndWFnZSBnb3Zlcm5pbmcgcGVybWlzc2lvbnMgYW5kCiAgICBsaW1pdGF0aW9ucyB1bmRlciB0aGUgTGljZW5zZS4KCi0tPgo8TFdNMk0geG1sbnM6eHNpPSJodHRwOi8vd3d3LnczLm9yZy8yMDAxL1hNTFNjaGVtYS1pbnN0YW5jZSIKICAgICAgIHhzaTpub05hbWVzcGFjZVNjaGVtYUxvY2F0aW9uPSJodHRwOi8vd3d3Lm9wZW5tb2JpbGVhbGxpYW5jZS5vcmcvdGVjaC9wcm9maWxlcy9MV00yTS12MV8xLnhzZCI+CiAgICA8T2JqZWN0IE9iamVjdFR5cGU9Ik1PRGVmaW5pdGlvbiI+CiAgICAgICAgPE5hbWU+THdNMk0gTW9uaXRvcmluZzwvTmFtZT4KICAgICAgICA8RGVzY3JpcHRpb24xPgogICAgICAgICAgICA8IVtDREFUQVtdXT48L0Rlc2NyaXB0aW9uMT4KICAgICAgICA8T2JqZWN0SUQ+MzwvT2JqZWN0SUQ+CiAgICAgICAgPE9iamVjdFVSTj51cm46b21hOmx3bTJtOm9tYTozOjEuMTwvT2JqZWN0VVJOPgogICAgICAgIDxMV00yTVZlcnNpb24+MS4xPC9MV00yTVZlcnNpb24+CiAgICAgICAgPE9iamVjdFZlcnNpb24+MS4xPC9PYmplY3RWZXJzaW9uPgogICAgICAgIDxNdWx0aXBsZUluc3RhbmNlcz5TaW5nbGU8L011bHRpcGxlSW5zdGFuY2VzPgogICAgICAgIDxNYW5kYXRvcnk+TWFuZGF0b3J5PC9NYW5kYXRvcnk+CiAgICAgICAgPFJlc291cmNlcz4KICAgICAgICAgICAgPEl0ZW0gSUQ9IjAiPgogICAgICAgICAgICAgICAgPE5hbWU+VGVzdCBkYXRhPC9OYW1lPgogICAgICAgICAgICAgICAgPE9wZXJhdGlvbnM+UjwvT3BlcmF0aW9ucz4KICAgICAgICAgICAgICAgIDxNdWx0aXBsZUluc3RhbmNlcz5TaW5nbGU8L011bHRpcGxlSW5zdGFuY2VzPgogICAgICAgICAgICAgICAgPE1hbmRhdG9yeT5PcHRpb25hbDwvTWFuZGF0b3J5PgogICAgICAgICAgICAgICAgPFR5cGU+U3RyaW5nPC9UeXBlPgogICAgICAgICAgICAgICAgPFJhbmdlRW51bWVyYXRpb24+PC9SYW5nZUVudW1lcmF0aW9uPgogICAgICAgICAgICAgICAgPFVuaXRzPjwvVW5pdHM+CiAgICAgICAgICAgICAgICA8RGVzY3JpcHRpb24+PCFbQ0RBVEFbVGVzdCBkYXRhXV0+PC9EZXNjcmlwdGlvbj4KICAgICAgICAgICAgPC9JdGVtPgogICAgICAgIDwvUmVzb3VyY2VzPgogICAgICAgIDxEZXNjcmlwdGlvbjI+PC9EZXNjcmlwdGlvbjI+CiAgICA8L09iamVjdD4KPC9MV00yTT4K"
"data": "PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPCEtLQoKICAgIENvcHlyaWdodCDCqSAyMDE2LTIwMjQgVGhlIFRoaW5nc2JvYXJkIEF1dGhvcnMKCiAgICBMaWNlbnNlZCB1bmRlciB0aGUgQXBhY2hlIExpY2Vuc2UsIFZlcnNpb24gMi4wICh0aGUgIkxpY2Vuc2UiKTsKICAgIHlvdSBtYXkgbm90IHVzZSB0aGlzIGZpbGUgZXhjZXB0IGluIGNvbXBsaWFuY2Ugd2l0aCB0aGUgTGljZW5zZS4KICAgIFlvdSBtYXkgb2J0YWluIGEgY29weSBvZiB0aGUgTGljZW5zZSBhdAoKICAgICAgICBodHRwOi8vd3d3LmFwYWNoZS5vcmcvbGljZW5zZXMvTElDRU5TRS0yLjAKCiAgICBVbmxlc3MgcmVxdWlyZWQgYnkgYXBwbGljYWJsZSBsYXcgb3IgYWdyZWVkIHRvIGluIHdyaXRpbmcsIHNvZnR3YXJlCiAgICBkaXN0cmlidXRlZCB1bmRlciB0aGUgTGljZW5zZSBpcyBkaXN0cmlidXRlZCBvbiBhbiAiQVMgSVMiIEJBU0lTLAogICAgV0lUSE9VVCBXQVJSQU5USUVTIE9SIENPTkRJVElPTlMgT0YgQU5ZIEtJTkQsIGVpdGhlciBleHByZXNzIG9yIGltcGxpZWQuCiAgICBTZWUgdGhlIExpY2Vuc2UgZm9yIHRoZSBzcGVjaWZpYyBsYW5ndWFnZSBnb3Zlcm5pbmcgcGVybWlzc2lvbnMgYW5kCiAgICBsaW1pdGF0aW9ucyB1bmRlciB0aGUgTGljZW5zZS4KCi0tPgo8TFdNMk0geG1sbnM6eHNpPSJodHRwOi8vd3d3LnczLm9yZy8yMDAxL1hNTFNjaGVtYS1pbnN0YW5jZSIKICAgICAgIHhzaTpub05hbWVzcGFjZVNjaGVtYUxvY2F0aW9uPSJodHRwOi8vd3d3Lm9wZW5tb2JpbGVhbGxpYW5jZS5vcmcvdGVjaC9wcm9maWxlcy9MV00yTS12MV8xLnhzZCI+CiAgICA8T2JqZWN0IE9iamVjdFR5cGU9Ik1PRGVmaW5pdGlvbiI+CiAgICAgICAgPE5hbWU+THdNMk0gTW9uaXRvcmluZzwvTmFtZT4KICAgICAgICA8RGVzY3JpcHRpb24xPgogICAgICAgICAgICA8IVtDREFUQVtdXT48L0Rlc2NyaXB0aW9uMT4KICAgICAgICA8T2JqZWN0SUQ+MzwvT2JqZWN0SUQ+CiAgICAgICAgPE9iamVjdFVSTj51cm46b21hOmx3bTJtOm9tYTozPC9PYmplY3RVUk4+CiAgICAgICAgPExXTTJNVmVyc2lvbj4xLjE8L0xXTTJNVmVyc2lvbj4KICAgICAgICA8T2JqZWN0VmVyc2lvbj4xLjA8L09iamVjdFZlcnNpb24+CiAgICAgICAgPE11bHRpcGxlSW5zdGFuY2VzPlNpbmdsZTwvTXVsdGlwbGVJbnN0YW5jZXM+CiAgICAgICAgPE1hbmRhdG9yeT5NYW5kYXRvcnk8L01hbmRhdG9yeT4KICAgICAgICA8UmVzb3VyY2VzPgogICAgICAgICAgICA8SXRlbSBJRD0iMCI+CiAgICAgICAgICAgICAgICA8TmFtZT5UZXN0IGRhdGE8L05hbWU+CiAgICAgICAgICAgICAgICA8T3BlcmF0aW9ucz5SPC9PcGVyYXRpb25zPgogICAgICAgICAgICAgICAgPE11bHRpcGxlSW5zdGFuY2VzPlNpbmdsZTwvTXVsdGlwbGVJbnN0YW5jZXM+CiAgICAgICAgICAgICAgICA8TWFuZGF0b3J5Pk9wdGlvbmFsPC9NYW5kYXRvcnk+CiAgICAgICAgICAgICAgICA8VHlwZT5TdHJpbmc8L1R5cGU+CiAgICAgICAgICAgICAgICA8UmFuZ2VFbnVtZXJhdGlvbj48L1JhbmdlRW51bWVyYXRpb24+CiAgICAgICAgICAgICAgICA8VW5pdHM+PC9Vbml0cz4KICAgICAgICAgICAgICAgIDxEZXNjcmlwdGlvbj48IVtDREFUQVtUZXN0IGRhdGFdXT48L0Rlc2NyaXB0aW9uPgogICAgICAgICAgICA8L0l0ZW0+CiAgICAgICAgPC9SZXNvdXJjZXM+CiAgICAgICAgPERlc2NyaXB0aW9uMj48L0Rlc2NyaXB0aW9uMj4KICAgIDwvT2JqZWN0Pgo8L0xXTTJNPgo="
}

View File

@ -91,26 +91,16 @@
<tb-power-mode-settings [parentForm]="clientSettingsFormGroup">
</tb-power-mode-settings>
</fieldset>
<!-- <mat-accordion multi="true">-->
<!-- <div *ngIf="false">-->
<!-- <mat-expansion-panel>-->
<!-- <mat-expansion-panel-header>-->
<!-- <mat-panel-title>{{ 'device-profile.lwm2m.client-strategy' | translate }}</mat-panel-title>-->
<!-- </mat-expansion-panel-header>-->
<!-- <ng-template matExpansionPanelContent>-->
<!-- <div fxLayout="column">-->
<!-- <mat-form-field class="mat-block">-->
<!-- <mat-label>{{ 'device-profile.lwm2m.client-strategy-label' | translate }}</mat-label>-->
<!-- <mat-select formControlName="clientOnlyObserveAfterConnect">-->
<!-- <mat-option value=1>{{ 'device-profile.lwm2m.client-strategy-only-observe' | translate }}</mat-option>-->
<!-- <mat-option value=2>{{ 'device-profile.lwm2m.client-strategy-read-all' | translate }}</mat-option>-->
<!-- </mat-select>-->
<!-- </mat-form-field>-->
<!-- </div>-->
<!-- </ng-template>-->
<!-- </mat-expansion-panel>-->
<!-- </div>-->
<!-- </mat-accordion>-->
<fieldset class="fields-group">
<legend class="group-title" translate>device-profile.lwm2m.default-object-id</legend>
<mat-form-field fxFlex>
<mat-select formControlName="defaultObjectIDVer">
<mat-option *ngFor="let objectIDVer of objectIDVers" [value]="objectIDVer">
{{ objectIDVerTranslationMap.get(objectIDVer) | translate }}
</mat-option>
</mat-select>
</mat-form-field>
</fieldset>
</section>
</ng-template>
</mat-tab>

View File

@ -40,9 +40,11 @@ import {
ObjectLwM2M,
OBSERVE,
PowerMode,
ObjectIDVer,
RESOURCES,
ServerSecurityConfig,
TELEMETRY
TELEMETRY,
ObjectIDVerTranslationMap
} from './lwm2m-profile-config.models';
import { DeviceProfileService } from '@core/http/device-profile.service';
import { deepClone, isDefinedAndNotNull, isEmpty, isUndefined } from '@core/utils';
@ -80,6 +82,10 @@ export class Lwm2mDeviceProfileTransportConfigurationComponent implements Contro
lwm2mDeviceProfileFormGroup: UntypedFormGroup;
configurationValue: Lwm2mProfileConfigModels;
objectIDVers = Object.values(ObjectIDVer) as ObjectIDVer[];
objectIDVerTranslationMap = ObjectIDVerTranslationMap;
sortFunction: (key: string, value: object) => object;
get required(): boolean {
@ -116,7 +122,8 @@ export class Lwm2mDeviceProfileTransportConfigurationComponent implements Contro
powerMode: [PowerMode.DRX, Validators.required],
edrxCycle: [{disabled: true, value: 0}, Validators.required],
psmActivityTimer: [{disabled: true, value: 0}, Validators.required],
pagingTransmissionWindow: [{disabled: true, value: 0}, Validators.required]
pagingTransmissionWindow: [{disabled: true, value: 0}, Validators.required],
defaultObjectIDVer: [ObjectIDVer.V1_0, Validators.required]
})
});
@ -274,7 +281,8 @@ export class Lwm2mDeviceProfileTransportConfigurationComponent implements Contro
edrxCycle: this.configurationValue.clientLwM2mSettings.edrxCycle || DEFAULT_EDRX_CYCLE,
pagingTransmissionWindow:
this.configurationValue.clientLwM2mSettings.pagingTransmissionWindow || DEFAULT_PAGING_TRANSMISSION_WINDOW,
psmActivityTimer: this.configurationValue.clientLwM2mSettings.psmActivityTimer || DEFAULT_PSM_ACTIVITY_TIMER
psmActivityTimer: this.configurationValue.clientLwM2mSettings.psmActivityTimer || DEFAULT_PSM_ACTIVITY_TIMER,
defaultObjectIDVer: this.configurationValue.clientLwM2mSettings.defaultObjectIDVer || ObjectIDVer.V1_0
}
},
{emitEvent: false});

View File

@ -122,6 +122,18 @@ export const PowerModeTranslationMap = new Map<PowerMode, string>(
]
);
export enum ObjectIDVer {
V1_0 = '1.0',
V1_1 = '1.1'
}
export const ObjectIDVerTranslationMap = new Map<ObjectIDVer, string>(
[
[ObjectIDVer.V1_0, 'device-profile.lwm2m.default-object-id-ver.v1-0'],
[ObjectIDVer.V1_1, 'device-profile.lwm2m.default-object-id-ver.v1-1']
]
);
export interface ServerSecurityConfig {
host?: string;
port?: number;
@ -163,7 +175,7 @@ export interface ClientLwM2mSettings {
edrxCycle?: number;
pagingTransmissionWindow?: number;
psmActivityTimer?: number;
compositeOperationsSupport: boolean;
defaultObjectIDVer: ObjectIDVer;
}
export interface ObservableAttributes {
@ -190,7 +202,7 @@ export function getDefaultProfileClientLwM2mSettingsConfig(): ClientLwM2mSetting
fwUpdateStrategy: 1,
swUpdateStrategy: 1,
powerMode: PowerMode.DRX,
compositeOperationsSupport: false
defaultObjectIDVer: ObjectIDVer.V1_0
};
}

View File

@ -2078,8 +2078,7 @@
"step": "خطوة",
"min-evaluation-period": "الفترة الدنيا للتقييم",
"max-evaluation-period": "الفترة القصوى للتقييم"
},
"composite-operations-support": "يدعم عمليات القراءة/الكتابة/المراقبة المركبة"
}
},
"snmp": {
"add-communication-config": "إضافة تكوين الاتصال",

View File

@ -1899,6 +1899,11 @@
"step": "Step",
"min-evaluation-period": "Minimum evaluation period",
"max-evaluation-period": "Maximum evaluation period"
},
"default-object-id": "Default Object Version (Attribute)",
"default-object-id-ver": {
"v1-0": "1.0",
"v1-1": "1.1"
}
},
"snmp": {