Merge branch 'rc' of github.com:thingsboard/thingsboard into rc

This commit is contained in:
Igor Kulikov 2025-04-10 18:00:32 +03:00
commit b842fd773a
9 changed files with 29 additions and 19 deletions

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.queue; package org.thingsboard.server.service.queue;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
@ -70,7 +69,6 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j
public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService { public class DefaultTbCalculatedFieldConsumerService extends AbstractPartitionBasedConsumerService<ToCalculatedFieldNotificationMsg> implements TbCalculatedFieldConsumerService {
@Value("${queue.calculated_fields.poll_interval:25}") @Value("${queue.calculated_fields.poll_interval:25}")

View File

@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
@ -118,7 +117,6 @@ import java.util.stream.Collectors;
@Service @Service
@TbCoreComponent @TbCoreComponent
@Slf4j
public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCoreNotificationMsg> implements TbCoreConsumerService { public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCoreNotificationMsg> implements TbCoreConsumerService {
@Value("${queue.core.poll-interval}") @Value("${queue.core.poll-interval}")

View File

@ -19,8 +19,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -45,12 +43,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeNotificationMsg;
import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.queue.processing.IdMsgPair;
@ -66,7 +64,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j
@Service @Service
@TbCoreComponent @TbCoreComponent
public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdgeNotificationMsg> implements TbEdgeConsumerService { public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdgeNotificationMsg> implements TbEdgeConsumerService {

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.service.queue; package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
@ -65,7 +64,6 @@ import java.util.stream.Collectors;
@Service @Service
@TbRuleEngineComponent @TbRuleEngineComponent
@Slf4j
public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService { public class DefaultTbRuleEngineConsumerService extends AbstractPartitionBasedConsumerService<ToRuleEngineNotificationMsg> implements TbRuleEngineConsumerService {
private final TbRuleEngineConsumerContext ctx; private final TbRuleEngineConsumerContext ctx;

View File

@ -17,7 +17,8 @@ package org.thingsboard.server.service.queue.processing;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
@ -62,10 +63,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> { public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final ActorSystemContext actorContext; protected final ActorSystemContext actorContext;
protected final TbTenantProfileCache tenantProfileCache; protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache; protected final TbDeviceProfileCache deviceProfileCache;

View File

@ -28,6 +28,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -35,7 +37,7 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
private final Lock startupLock = new ReentrantLock(); private final Lock startupLock = new ReentrantLock();
private volatile boolean started = false; private volatile boolean started = false;
private PartitionChangeEvent lastPartitionChangeEvent; private List<PartitionChangeEvent> pendingEvents = new ArrayList<>();
public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext, public AbstractPartitionBasedConsumerService(ActorSystemContext actorContext,
TbTenantProfileCache tenantProfileCache, TbTenantProfileCache tenantProfileCache,
@ -61,8 +63,16 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
onStartUp(); onStartUp();
startupLock.lock(); startupLock.lock();
try { try {
onPartitionChangeEvent(lastPartitionChangeEvent); for (PartitionChangeEvent partitionChangeEvent : pendingEvents) {
log.info("Handling partition change event: {}", partitionChangeEvent);
try {
onPartitionChangeEvent(partitionChangeEvent);
} catch (Throwable t) {
log.error("Failed to handle partition change event: {}", partitionChangeEvent, t);
}
}
started = true; started = true;
pendingEvents = null;
} finally { } finally {
startupLock.unlock(); startupLock.unlock();
} }
@ -70,17 +80,20 @@ public abstract class AbstractPartitionBasedConsumerService<N extends com.google
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
log.debug("Received partition change event: {}", event);
if (!started) { if (!started) {
startupLock.lock(); startupLock.lock();
try { try {
if (!started) { if (!started) {
lastPartitionChangeEvent = event; log.debug("App not started yet, storing event for later: {}", event);
pendingEvents.add(event);
return; return;
} }
} finally { } finally {
startupLock.unlock(); startupLock.unlock();
} }
} }
log.info("Handling partition change event: {}", event);
onPartitionChangeEvent(event); onPartitionChangeEvent(event);
} }

View File

@ -1423,7 +1423,7 @@ device:
pem_cert_file: "${DEVICE_CONNECTIVITY_COAPS_CA_ROOT_CERT:cafile.pem}" pem_cert_file: "${DEVICE_CONNECTIVITY_COAPS_CA_ROOT_CERT:cafile.pem}"
gateway: gateway:
# The docker tag for thingsboard/tb-gateway image used in docker-compose file for gateway launch # The docker tag for thingsboard/tb-gateway image used in docker-compose file for gateway launch
image_version: "${DEVICE_CONNECTIVITY_GATEWAY_IMAGE_VERSION:latest}" image_version: "${DEVICE_CONNECTIVITY_GATEWAY_IMAGE_VERSION:3.7-stable}"
# Edges parameters # Edges parameters
edges: edges:

View File

@ -23,6 +23,7 @@ import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
@ -94,6 +95,9 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
private DeviceProfileId mqttDeviceProfileId; private DeviceProfileId mqttDeviceProfileId;
private DeviceProfileId coapDeviceProfileId; private DeviceProfileId coapDeviceProfileId;
@Value("${device.connectivity.gateway.image_version:3.7-stable}")
private String gatewayImageVersion;
@Before @Before
public void beforeTest() throws Exception { public void beforeTest() throws Exception {
loginSysAdmin(); loginSysAdmin();
@ -298,7 +302,7 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
"services:\n" + "services:\n" +
" # ThingsBoard IoT Gateway Service Configuration\n" + " # ThingsBoard IoT Gateway Service Configuration\n" +
" tb-gateway:\n" + " tb-gateway:\n" +
" image: thingsboard/tb-gateway:latest\n" + " image: thingsboard/tb-gateway:" + gatewayImageVersion + "\n" +
" container_name: tb-gateway\n" + " container_name: tb-gateway\n" +
" restart: always\n" + " restart: always\n" +
"\n" + "\n" +
@ -847,7 +851,7 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
"-t \"application/json\" -e \"{temperature:25}\" coap://test.domain:5683/api/v1/%s/telemetry", credentials.getCredentialsId())); "-t \"application/json\" -e \"{temperature:25}\" coap://test.domain:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download"); assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " + assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R "+ CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://test.domain:5684/api/v1/%s/telemetry", credentials.getCredentialsId())); "-R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://test.domain:5684/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER); JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it " + assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it " +

View File

@ -85,7 +85,7 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
private String mqttsPemCertFile; private String mqttsPemCertFile;
@Value("${device.connectivity.coaps.pem_cert_file:}") @Value("${device.connectivity.coaps.pem_cert_file:}")
private String coapsPemCertFile; private String coapsPemCertFile;
@Value("${device.connectivity.gateway.image_version:latest}") @Value("${device.connectivity.gateway.image_version:3.7-stable}")
private String gatewayImageVersion; private String gatewayImageVersion;
@Override @Override