Merge branch 'master' of github.com:thingsboard/thingsboard

This commit is contained in:
Igor Kulikov 2024-12-03 13:23:34 +02:00
commit 2677bfdc96
46 changed files with 307 additions and 235 deletions

View File

@ -31,7 +31,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.AttributeScope;
@ -75,7 +75,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -191,9 +190,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.error("Failed to start Edge RPC server!", e);
throw new RuntimeException("Failed to start Edge RPC server!");
}
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-check-scheduler"));
this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler"));
this.executorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-service"));
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler");
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler");
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service");
log.info("Edge RPC service initialized!");
}

View File

@ -21,10 +21,9 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ExecutorProvider;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Lazy
@ -46,7 +45,7 @@ public class PubSubRuleNodeExecutorProvider implements ExecutorProvider {
if (threadPoolSize == null) {
threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors();
}
executor = Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-rule-nodes"));
executor = ThingsBoardExecutors.newScheduledThreadPool(threadPoolSize, "pubsub-rule-nodes");
}
@Override

View File

@ -35,7 +35,7 @@ import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Service;
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.TbEmail;
import org.thingsboard.server.cache.limits.RateLimitService;
@ -60,7 +60,6 @@ import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -111,7 +110,7 @@ public class DefaultMailService implements MailService {
this.freemarkerConfig = freemarkerConfig;
this.adminSettingsService = adminSettingsService;
this.apiUsageClient = apiUsageClient;
this.timeoutScheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("mail-service-watchdog"));
this.timeoutScheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("mail-service-watchdog");
}
@PostConstruct

View File

@ -67,9 +67,6 @@ public class TbMailSender extends JavaMailSenderImpl {
if (jsonConfig.has("password")) {
setPassword(jsonConfig.get("password").asText());
}
if (jsonConfig.has("mailFrom")) {
setUsername(jsonConfig.get("mailFrom").asText());
}
setJavaMailProperties(createJavaMailProperties(jsonConfig));
}

View File

@ -23,7 +23,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
@ -49,7 +49,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -63,7 +62,7 @@ public class DefaultNotificationSchedulerService extends AbstractPartitionBasedS
private final NotificationCenter notificationCenter;
private final NotificationRequestService notificationRequestService;
private final NotificationExecutorService notificationExecutor;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("notification-scheduler"));
private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("notification-scheduler");
private final Map<NotificationRequestId, ScheduledRequestMetadata> scheduledNotificationRequests = new ConcurrentHashMap<>();

View File

@ -22,7 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -40,7 +40,6 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
@Slf4j
public abstract class AbstractPartitionBasedService<T extends EntityId> extends TbApplicationEventListener<PartitionChangeEvent> {
@ -67,7 +66,7 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
protected void init() {
// Should be always single threaded due to absence of locks.
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getSchedulerExecutorName())));
scheduledExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newSingleThreadScheduledExecutor(getSchedulerExecutorName()));
}
protected ServiceType getServiceType() {

View File

@ -82,7 +82,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
public void init(String prefix) {
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName(prefix + "-consumer"));
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(getMgmtThreadPoolSize(), prefix + "-mgmt");
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(prefix + "-consumer-scheduler"));
this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(prefix + "-consumer-scheduler");
this.nfConsumer = QueueConsumerManager.<TbProtoQueueMsg<N>>builder()
.name(getServiceType().getLabel() + " Notifications")

View File

@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
@ -46,7 +46,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -86,7 +85,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
@PostConstruct
public void initExecutor() {
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-core-rpc-scheduler"));
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-core-rpc-scheduler");
serviceId = serviceInfoProvider.getServiceId();
}

View File

@ -20,7 +20,7 @@ import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.server.cluster.TbClusterService;
@ -45,7 +45,6 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -83,7 +82,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi
@PostConstruct
public void initExecutor() {
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rule-engine-rpc-scheduler"));
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("rule-engine-rpc-scheduler");
serviceId = serviceInfoProvider.getServiceId();
}

View File

@ -19,7 +19,7 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
@ -30,7 +30,6 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -51,7 +50,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService {
@PostConstruct
public void initExecutor() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback"));
executor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("re-rest-callback");
}
@PreDestroy

View File

@ -31,6 +31,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
@ -90,7 +91,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -159,11 +159,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
serviceId = serviceInfoProvider.getServiceId();
wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback"));
tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale");
ThreadFactory tbThreadFactory = ThingsBoardThreadFactory.forName("ws-entity-sub-scheduler");
if (dynamicPageLinkRefreshPoolSize == 1) {
scheduler = Executors.newSingleThreadScheduledExecutor(tbThreadFactory);
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("ws-entity-sub-scheduler");
} else {
scheduler = Executors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, tbThreadFactory);
scheduler = ThingsBoardExecutors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, "ws-entity-sub-scheduler");
}
}

View File

@ -130,7 +130,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass());
tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId
serviceId = serviceInfoProvider.getServiceId();
staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup"));
staleSessionCleanupExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("stale-session-cleanup");
staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS);
}

View File

@ -19,10 +19,9 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -36,7 +35,7 @@ public class SubscriptionSchedulerComponent {
@PostConstruct
public void initExecutor() {
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("subscription-scheduler"));
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("subscription-scheduler");
}
@PreDestroy

View File

@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.sync.vc.GitRepository;
@ -32,7 +32,6 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -44,7 +43,7 @@ public class DefaultGitSyncService implements GitSyncService {
@Value("${vc.git.repositories-folder:${java.io.tmpdir}/repositories}")
private String repositoriesFolder;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("git-sync"));
private final ScheduledExecutorService executor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("git-sync");
private final Map<String, GitRepository> repositories = new ConcurrentHashMap<>();
private final Map<String, Runnable> updateListeners = new ConcurrentHashMap<>();

View File

@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.server.common.data.AdminSettings;
@ -55,7 +55,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -106,7 +105,7 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
synchronized (this) {
if (myPartition) {
if (scheduler == null) {
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-system-info-scheduler"));
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-system-info-scheduler");
scheduler.scheduleWithFixedDelay(this::saveCurrentSystemInfo, 0, systemInfoPersistFrequencySeconds, TimeUnit.SECONDS);
}
} else {

View File

@ -27,7 +27,7 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.EdgeUpgradeMessage;
import org.thingsboard.server.common.data.UpdateMessage;
import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionTrigger;
@ -42,7 +42,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -74,7 +73,7 @@ public class DefaultUpdateService implements UpdateService {
@Autowired(required = false)
private EdgeUpgradeInstructionsService edgeUpgradeInstructionsService;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service"));
private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-update-service");
private ScheduledFuture<?> checkUpdatesFuture = null;
private final RestTemplate restClient = new RestTemplate();

View File

@ -33,7 +33,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.socket.CloseStatus;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
@ -98,7 +97,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@ -158,7 +156,7 @@ public class DefaultWebSocketService implements WebSocketService {
serviceId = serviceInfoProvider.getServiceId();
executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass());
pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping"));
pingExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("telemetry-web-socket-ping");
pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS);
cmdsHandlers = new EnumMap<>(WsCmdType.class);

View File

@ -157,7 +157,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer);
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer"));
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt");
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler"));
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-rule-engine-consumer-scheduler");
ruleEngineConsumerContext.setTopicDeletionDelayInSec(5);
queue = new Queue();

View File

@ -23,7 +23,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.TenantId;
@ -37,7 +37,6 @@ import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -63,7 +62,7 @@ public class DefaultRuleEngineCallServiceTest {
@BeforeEach
void setUp() {
executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback"));
executor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("re-rest-callback");
ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock);
ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor);
ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests);

View File

@ -33,7 +33,7 @@ import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.http.HttpStatus;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
@ -82,7 +82,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -198,7 +197,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte
}
private void init() throws Exception {
executor = Executors.newScheduledThreadPool(10, ThingsBoardThreadFactory.forName("test-lwm2m-scheduled"));
executor = ThingsBoardExecutors.newScheduledThreadPool(10, "test-lwm2m-scheduled");
loginTenantAdmin();
for (String resourceName : this.resources) {
TbResource lwModel = new TbResource();

View File

@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.WriteResponse;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.security.auth.Destroyable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope");
private final AtomicInteger state = new AtomicInteger(0);

View File

@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.WriteResponse;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.security.auth.Destroyable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +38,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope");
private final AtomicInteger state = new AtomicInteger(0);

View File

@ -18,7 +18,7 @@ package org.thingsboard.server.actors;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.msg.TbActorMsg;
import java.util.Collections;
@ -27,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@ -51,7 +50,7 @@ public class DefaultTbActorSystem implements TbActorSystem {
public DefaultTbActorSystem(TbActorSystemSettings settings) {
this.settings = settings;
this.scheduler = Executors.newScheduledThreadPool(settings.getSchedulerPoolSize(), ThingsBoardThreadFactory.forName("actor-system-scheduler"));
this.scheduler = ThingsBoardExecutors.newScheduledThreadPool(settings.getSchedulerPoolSize(), "actor-system-scheduler");
}
@Override

View File

@ -27,14 +27,13 @@ import org.eclipse.californium.scandium.DTLSConnector;
import org.eclipse.californium.scandium.config.DtlsConnectorConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -115,7 +114,7 @@ public class DefaultCoapServerService implements CoapServerService {
CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build();
server.addEndpoint(dtlsCoapEndpoint);
tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier();
dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName()));
dtlsSessionsExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName());
dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS);
}
Resource root = server.getRoot();

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.common;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.MessagesStats;
@ -71,7 +72,7 @@ public class DefaultTbQueueResponseTemplate<Request extends TbQueueMsg, Response
this.requestTimeout = requestTimeout;
this.callbackExecutor = executor;
this.stats = stats;
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-queue-response-template-timeout-" + requestTemplate.getTopic()));
this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-queue-response-template-timeout-" + requestTemplate.getTopic());
this.loopExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-queue-response-template-loop-" + requestTemplate.getTopic()));
}

View File

@ -40,7 +40,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
import org.thingsboard.server.queue.util.AfterStartUp;
@ -48,7 +48,6 @@ import org.thingsboard.server.queue.util.AfterStartUp;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -105,7 +104,7 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi
Assert.notNull(zkConnectionTimeout, missingProperty("zk.connection_timeout_ms"));
Assert.notNull(zkSessionTimeout, missingProperty("zk.session_timeout_ms"));
zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
zkExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zk-discovery");
log.info("Initializing discovery service using ZK connect string: {}", zkUrl);

View File

@ -30,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -43,7 +43,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -69,7 +68,7 @@ public class TbKafkaConsumerStatsService {
if (!statsConfig.getEnabled()) {
return;
}
this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
this.statsPrintScheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("kafka-consumer-stats");
Properties consumerProps = kafkaSettings.toConsumerProps(null);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client");

View File

@ -26,11 +26,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.concurrent.Executors;
@Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='pubsub'")
@ -71,7 +70,7 @@ public class TbPubSubSettings {
threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors();
}
executorProvider = FixedExecutorProvider
.create(Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-queue-executor")));
.create(ThingsBoardExecutors.newScheduledThreadPool(threadPoolSize, "pubsub-queue-executor"));
}
@PreDestroy

View File

@ -19,10 +19,9 @@ import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -35,7 +34,7 @@ public class DefaultSchedulerComponent implements SchedulerComponent {
@PostConstruct
public void init() {
schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("queue-scheduler"));
schedulerExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("queue-scheduler");
}
@PreDestroy
@ -54,19 +53,11 @@ public class DefaultSchedulerComponent implements SchedulerComponent {
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return schedulerExecutor.scheduleAtFixedRate(() -> runSafely(command), initialDelay, period, unit);
return schedulerExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return schedulerExecutor.scheduleWithFixedDelay(() -> runSafely(command), initialDelay, delay, unit);
}
private void runSafely(Runnable command) {
try {
command.run();
} catch (Throwable t) {
log.error("Unexpected error occurred while executing task!", t);
}
return schedulerExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}

View File

@ -28,13 +28,12 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED;
@ -81,7 +80,7 @@ public class ZkDiscoveryServiceTest {
@BeforeEach
public void setup() {
zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(applicationEventPublisher, serviceInfoProvider, partitionService));
ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery"));
ScheduledExecutorService zkExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zk-discovery");
when(client.getState()).thenReturn(CuratorFrameworkState.STARTED);
ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false);
ReflectionTestUtils.setField(zkDiscoveryService, "client", client);

View File

@ -1,95 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.scheduler;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
class DefaultSchedulerComponentTest {
DefaultSchedulerComponent schedulerComponent;
@BeforeEach
void setup() {
schedulerComponent = new DefaultSchedulerComponent();
schedulerComponent.init();
}
@AfterEach
void cleanup() {
schedulerComponent.destroy();
}
@Test
@DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception")
void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
var wasExecutedAtLeastOnce = new AtomicBoolean(false);
Runnable exceptionThrowingCommand = () -> {
try {
throw new RuntimeException("Unexpected exception");
} finally {
wasExecutedAtLeastOnce.set(true);
}
};
// WHEN
ScheduledFuture<?> future = schedulerComponent.scheduleAtFixedRate(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Wait until command is executed at least once")
.atMost(5, TimeUnit.SECONDS)
.until(wasExecutedAtLeastOnce::get);
assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse();
}
@Test
@DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception")
void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
var wasExecutedAtLeastOnce = new AtomicBoolean(false);
Runnable exceptionThrowingCommand = () -> {
try {
throw new RuntimeException("Unexpected exception");
} finally {
wasExecutedAtLeastOnce.set(true);
}
};
// WHEN
ScheduledFuture<?> future = schedulerComponent.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Wait until command is executed at least once")
.atMost(5, TimeUnit.SECONDS)
.until(wasExecutedAtLeastOnce::get);
assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse();
}
}

View File

@ -21,7 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
@ -29,7 +29,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -87,7 +86,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
public void init() {
if (getMaxEvalRequestsTimeout() > 0 || getMaxInvokeRequestsTimeout() > 0) {
timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("script-timeout"));
timeoutExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("script-timeout");
}
}

View File

@ -25,13 +25,13 @@ import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.ObservationIdentifier;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.peer.LwM2mIdentity;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.server.registration.Deregistration;
import org.eclipse.leshan.server.registration.ExpirationListener;
import org.eclipse.leshan.server.registration.Registration;
import org.eclipse.leshan.server.registration.RegistrationStore;
import org.eclipse.leshan.server.registration.RegistrationUpdate;
import org.eclipse.leshan.server.registration.UpdatedRegistration;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
@ -45,7 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -84,9 +83,7 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable
}
public TbInMemoryRegistrationStore(LwM2MTransportServerConfig config, long cleanPeriodInSec, LwM2mVersionedModelProvider modelProvider) {
this(config, Executors.newScheduledThreadPool(1,
new NamedThreadFactory(String.format("TbInMemoryRegistrationStore Cleaner (%ds)", cleanPeriodInSec))),
cleanPeriodInSec, modelProvider);
this(config, ThingsBoardExecutors.newSingleThreadScheduledExecutor(String.format("TbInMemoryRegistrationStore Cleaner (%ds)", cleanPeriodInSec)), cleanPeriodInSec, modelProvider);
}
public TbInMemoryRegistrationStore(LwM2MTransportServerConfig config, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, LwM2mVersionedModelProvider modelProvider) {

View File

@ -24,15 +24,12 @@ import org.eclipse.californium.core.network.serialization.UdpDataSerializer;
import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.observation.CompositeObservation;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.ObservationIdentifier;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.peer.LwM2mIdentity;
import org.eclipse.leshan.core.util.NamedThreadFactory;
import org.eclipse.leshan.core.util.Validate;
import org.eclipse.leshan.server.redis.serialization.ObservationSerDes;
import org.eclipse.leshan.server.redis.serialization.RegistrationSerDes;
@ -48,6 +45,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig;
import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider;
@ -60,7 +58,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -117,9 +114,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
}
public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) {
this(config, connectionFactory, Executors.newScheduledThreadPool(1,
new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))),
cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider);
this(config, connectionFactory, ThingsBoardExecutors.newSingleThreadScheduledExecutor(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec)), cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider);
}
public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec,

View File

@ -50,7 +50,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.TbTransportService;
@ -76,7 +75,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -112,7 +110,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde
@PostConstruct
private void init() throws IOException {
scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(schedulerThreadPoolSize, ThingsBoardThreadFactory.forName("snmp-querying")));
scheduler = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newScheduledThreadPool(schedulerThreadPoolSize, "snmp-querying"));
executor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingThreadPoolSize, "snmp-response-processing");
initializeSnmp();

View File

@ -16,15 +16,20 @@
package org.thingsboard.common.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThingsBoardExecutors {
/** Cannot instantiate. */
private ThingsBoardExecutors(){}
/**
* Method forked from ExecutorService to provide thread poll name
* Method forked from ExecutorService to provide thread pool name
*
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
@ -63,4 +68,12 @@ public class ThingsBoardExecutors {
return executor;
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
return Executors.unconfigurableScheduledExecutorService(new ThingsBoardScheduledThreadPoolExecutor(1, ThingsBoardThreadFactory.forName(name)));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String name) {
return new ThingsBoardScheduledThreadPoolExecutor(corePoolSize, ThingsBoardThreadFactory.forName(name));
}
}

View File

@ -0,0 +1,65 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@Slf4j
final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
public ThingsBoardScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null) { // preserve the original NPE behavior of ScheduledThreadPoolExecutor with a more helpful message
throw new NullPointerException("Command is null");
}
return super.scheduleAtFixedRate(new PeriodicRunnable(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null) { // preserve the original NPE behavior of ScheduledThreadPoolExecutor with a more helpful message
throw new NullPointerException("Command is null");
}
return super.scheduleWithFixedDelay(new PeriodicRunnable(command), initialDelay, delay, unit);
}
private record PeriodicRunnable(Runnable runnable) implements Runnable {
public void run() {
try {
runnable.run();
} catch (Exception e) {
// Log exceptions but do not propagate it. This ensures that subsequent scheduled tasks will still run.
log.error("Uncaught exception occurred during periodic task execution!", e);
} catch (Throwable th) {
// Log and rethrow other serious issues that are not regular Exceptions.
log.error("Critical exception occurred during periodic task execution!", th);
throw th;
}
}
}
}

View File

@ -0,0 +1,147 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
class ThingsBoardScheduledThreadPoolExecutorTest {
ThingsBoardScheduledThreadPoolExecutor scheduler;
@BeforeEach
void setup() {
scheduler = new ThingsBoardScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());
}
@AfterEach
void cleanup() {
scheduler.shutdownNow();
}
@Test
@DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception")
void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
AtomicInteger executionCounter = new AtomicInteger(0);
Runnable exceptionThrowingCommand = () -> {
try {
throw new RuntimeException("Unexpected exception");
} finally {
executionCounter.incrementAndGet();
}
};
// WHEN
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Wait until command is executed at least twice")
.atMost(10, TimeUnit.SECONDS)
.failFast("Future should not be done or cancelled; task should continue running", () -> future.isDone() || future.isCancelled())
.untilAsserted(() -> assertThat(executionCounter.get())
.as("Task should be executed at least twice")
.isGreaterThan(2));
}
@Test
@DisplayName("scheduleAtFixedRate() should stop periodic execution if command throws an error")
void scheduleAtFixedRateShouldStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
AtomicInteger executionCounter = new AtomicInteger(0);
Runnable exceptionThrowingCommand = () -> {
try {
throw new Error("Unexpected error");
} finally {
executionCounter.incrementAndGet();
}
};
// WHEN
scheduler.scheduleAtFixedRate(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Command that throws an error should execute exactly once")
.pollDelay(5, TimeUnit.SECONDS)
.atMost(10, TimeUnit.SECONDS)
.failFast("Command that throws an error should not execute more than once", () -> executionCounter.get() > 1)
.until(() -> executionCounter.get() == 1);
}
@Test
@DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception")
void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
AtomicInteger executionCounter = new AtomicInteger(0);
Runnable exceptionThrowingCommand = () -> {
try {
throw new RuntimeException("Unexpected exception");
} finally {
executionCounter.incrementAndGet();
}
};
// WHEN
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Wait until command is executed at least twice")
.atMost(10, TimeUnit.SECONDS)
.failFast("Future should not be done or cancelled; task should continue running", () -> future.isDone() || future.isCancelled())
.untilAsserted(() -> assertThat(executionCounter.get())
.as("Task should be executed at least twice")
.isGreaterThan(2));
}
@Test
@DisplayName("scheduleWithFixedDelay() should stop periodic execution if command throws an error")
void scheduleWithFixedDelayShouldStopPeriodicExecutionWhenCommandThrowsException() {
// GIVEN
AtomicInteger executionCounter = new AtomicInteger(0);
Runnable exceptionThrowingCommand = () -> {
try {
throw new Error("Unexpected error");
} finally {
executionCounter.incrementAndGet();
}
};
// WHEN
scheduler.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS);
// THEN
Awaitility.await().alias("Command that throws an error should execute exactly once")
.pollDelay(5, TimeUnit.SECONDS)
.atMost(10, TimeUnit.SECONDS)
.failFast("Command that throws an error should not execute more than once", () -> executionCounter.get() > 1)
.until(() -> executionCounter.get() == 1);
}
}

View File

@ -32,7 +32,7 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.cache.TbTransactionalCache;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.audit.ActionType;
@ -62,7 +62,6 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@ -101,7 +100,7 @@ public class BaseRelationService implements RelationService {
@PostConstruct
public void init() {
timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("relations-query-timeout"));
timeoutExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("relations-query-timeout");
}
@PreDestroy

View File

@ -18,9 +18,8 @@ package org.thingsboard.server.dao.sql;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -31,7 +30,7 @@ public class ScheduledLogExecutorComponent {
@PostConstruct
public void init() {
schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("sql-log"));
schedulerLogExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("sql-log");
}
@PreDestroy

View File

@ -92,7 +92,7 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend
this.queue = new LinkedBlockingDeque<>(queueLimit);
this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher"));
this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + getBufferName() + "-callback");
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-timeout"));
this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("nosql-" + getBufferName() + "-timeout");
this.stats = new BufferedRateExecutorStats(statsFactory);
String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + getBufferName(); //metric name may change with buffer name suffix
this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0));

View File

@ -23,12 +23,11 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.monitoring.service.BaseMonitoringService;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -51,7 +50,7 @@ public class ThingsboardMonitoringApplication {
@EventListener(ApplicationReadyEvent.class)
public void startMonitoring() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor"));
ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("monitoring-executor");
scheduler.scheduleWithFixedDelay(() -> {
monitoringServices.forEach(monitoringService -> {
monitoringService.runChecks();

View File

@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments;
import org.eclipse.leshan.core.response.ExecuteResponse;
import org.eclipse.leshan.core.response.ReadResponse;
import org.eclipse.leshan.core.response.WriteResponse;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import javax.security.auth.Destroyable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
private static final List<Integer> supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9);
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope"));
private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope");
private final AtomicInteger state = new AtomicInteger(0);

View File

@ -25,7 +25,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
@ -41,7 +41,6 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -64,7 +63,6 @@ public class TbMsgCountNodeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("1b21c7cc-0c9e-4ab1-b867-99451599e146"));
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("04dfbd38-10e5-47b7-925f-11e795db89e1"));
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("msg-count-node-test");
private final TbMsg tickMsg = TbMsg.newMsg(TbMsgType.MSG_COUNT_SELF_MSG, RULE_NODE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
private ScheduledExecutorService executorService;
@ -78,7 +76,7 @@ public class TbMsgCountNodeTest {
public void setUp() {
node = new TbMsgCountNode();
config = new TbMsgCountNodeConfiguration().defaultConfiguration();
executorService = Executors.newSingleThreadScheduledExecutor(factory);
executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("msg-count-node-test");
}
@AfterEach

View File

@ -29,7 +29,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.TbContext;
@ -53,7 +53,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@ -79,8 +78,6 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest {
private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1c649392-1f53-4377-b12f-1ba172611746"));
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("4470dfc2-f621-42b2-b82c-b5776d424140"));
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("msg-generator-node-test");
private TbMsgGeneratorNode node;
private TbMsgGeneratorNodeConfiguration config;
private ScheduledExecutorService executorService;
@ -94,7 +91,7 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest {
public void setUp() {
node = spy(new TbMsgGeneratorNode());
config = new TbMsgGeneratorNodeConfiguration().defaultConfiguration();
executorService = Executors.newSingleThreadScheduledExecutor(factory);
executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("msg-generator-node-test");
}
@AfterEach

View File

@ -29,7 +29,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -54,7 +54,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -78,8 +77,7 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest {
private TbContext ctx;
private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test");
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory);
private final ScheduledExecutorService executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("de-duplication-node-test");
private final int deduplicationInterval = 1;
private TenantId tenantId;