Merge branch 'feature/entities-version-control' of github.com:thingsboard/thingsboard into feature/entities-version-control

This commit is contained in:
Igor Kulikov 2022-05-25 12:26:40 +03:00
commit a8966094bb
22 changed files with 634 additions and 77 deletions

View File

@ -1025,7 +1025,7 @@ queue:
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:2000}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
@ -1120,9 +1120,11 @@ metrics:
percentiles: "${METRICS_TIMER_PERCENTILES:0.5}"
vc:
thread_pool_size: "${TB_VC_POOL_SIZE:4}"
# Pool size for handling export tasks
thread_pool_size: "${TB_VC_POOL_SIZE:2}"
git:
repos-poll-interval: "${TB_VC_GIT_REPOS_POLL_INTERVAL_SEC:60}"
# Pool size for handling the git IO operations
io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"
repositories-folder: "${TB_VC_GIT_REPOSITORIES_FOLDER:${java.io.tmpdir}/repositories}"
management:

View File

@ -71,6 +71,8 @@ public class HashPartitionServiceTest {
queueRoutingInfoService);
ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 10);
ReflectionTestUtils.setField(clusterRoutingService, "vcTopic", "tb.vc");
ReflectionTestUtils.setField(clusterRoutingService, "vcPartitions", 10);
ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
.setServiceId("tb-core-0")

View File

@ -33,14 +33,12 @@ import org.apache.zookeeper.KeeperException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueCoreSettings {

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueRemoteJsInvokeSettings {

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueRuleEngineSettings {

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueTransportApiSettings {

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueTransportNotificationSettings {

View File

@ -17,8 +17,10 @@ package org.thingsboard.server.queue.settings;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@Lazy
@Data
@Component
public class TbQueueVersionControlSettings {

View File

@ -15,8 +15,14 @@
*/
package org.thingsboard.server.service.sync.vc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
@ -57,9 +63,10 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbVersionControlComponent;
@ -67,6 +74,7 @@ import org.thingsboard.server.queue.util.TbVersionControlComponent;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -76,6 +84,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -87,7 +97,8 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public class DefaultClusterVersionControlService extends TbApplicationEventListener<PartitionChangeEvent> implements ClusterVersionControlService {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final TbQueueProducerProvider producerProvider;
private final TbVersionControlQueueFactory queueFactory;
private final DataDecodingEncodingService encodingService;
private final GitRepositoryService vcService;
@ -105,11 +116,21 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private long pollDuration;
@Value("${queue.vc.pack-processing-timeout:60000}")
private long packProcessingTimeout;
@Value("${vc.git.io_pool_size:3}")
private int ioPoolSize;
//We need to manually manage the threads since tasks for particular tenant need to be processed sequentially.
private final List<ListeningExecutorService> ioThreads = new ArrayList<>();
@PostConstruct
public void init() {
consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("vc-consumer"));
producer = queueFactory.createTbCoreNotificationsMsgProducer();
var threadFactory = ThingsBoardThreadFactory.forName("vc-io-thread");
for (int i = 0; i < ioPoolSize; i++) {
ioThreads.add(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(threadFactory)));
}
producer = producerProvider.getTbCoreNotificationsMsgProducer();
consumer = queueFactory.createToVersionControlMsgConsumer();
}
@ -122,11 +143,25 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
if (consumerExecutor != null) {
consumerExecutor.shutdownNow();
}
ioThreads.forEach(ExecutorService::shutdownNow);
}
@Override
protected void onTbApplicationEvent(PartitionChangeEvent event) {
//TODO: cleanup repositories that we no longer manage in this node.
for (TenantId tenantId : vcService.getActiveRepositoryTenants()) {
if (!partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) {
var lock = getRepoLock(tenantId);
lock.lock();
try {
pendingCommitMap.remove(tenantId);
vcService.clearRepository(tenantId);
} catch (Exception e) {
log.warn("[{}] Failed to cleanup the tenant repository", tenantId, e);
} finally {
lock.unlock();
}
}
}
consumer.subscribe(event.getPartitions());
}
@ -143,6 +178,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToVersionControlServiceMsg>> consumer) {
while (!stopped && !consumer.isStopped()) {
List<ListenableFuture<?>> futures = new ArrayList<>();
try {
List<TbProtoQueueMsg<ToVersionControlServiceMsg>> msgs = consumer.poll(pollDuration);
if (msgs.isEmpty()) {
@ -151,46 +187,17 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) {
ToVersionControlServiceMsg msg = msgWrapper.getValue();
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : getEntitiesVersionControlSettings(msg));
var lock = getRepoLock(ctx.getTenantId());
lock.lock();
try {
if (msg.hasClearRepositoryRequest()) {
handleClearRepositoryCommand(ctx);
} else {
if (msg.hasTestRepositoryRequest()) {
handleTestRepositoryCommand(ctx);
} else if (msg.hasInitRepositoryRequest()) {
handleInitRepositoryCommand(ctx);
} else {
var currentSettings = vcService.getRepositorySettings(ctx.getTenantId());
var newSettings = ctx.getSettings();
if (!newSettings.equals(currentSettings)) {
vcService.initRepository(ctx.getTenantId(), ctx.getSettings());
} else {
vcService.fetch(ctx.getTenantId());
}
if (msg.hasCommitRequest()) {
handleCommitRequest(ctx, msg.getCommitRequest());
} else if (msg.hasListBranchesRequest()) {
handleListBranches(ctx, msg.getListBranchesRequest());
} else if (msg.hasListEntitiesRequest()) {
handleListEntities(ctx, msg.getListEntitiesRequest());
} else if (msg.hasListVersionRequest()) {
handleListVersions(ctx, msg.getListVersionRequest());
} else if (msg.hasEntityContentRequest()) {
handleEntityContentRequest(ctx, msg.getEntityContentRequest());
} else if (msg.hasEntitiesContentRequest()) {
handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest());
}
}
}
} catch (Exception e) {
reply(ctx, Optional.of(e));
} finally {
lock.unlock();
}
long startTs = System.currentTimeMillis();
log.trace("[{}][{}] Submitting task.", ctx.getTenantId(), ctx.getRequestId());
ListenableFuture<Void> future = ioThreads.get(ctx.getTenantId().hashCode() % ioPoolSize).submit(() -> processMessage(ctx, msg));
logTaskExecution(ctx, future, startTs);
futures.add(future);
}
try {
Futures.allAsList(futures).get(packProcessingTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.info("Timeout for processing the version control tasks.", e);
}
//TODO: handle timeouts and async processing for multiple tenants;
consumer.commit();
} catch (Exception e) {
if (!stopped) {
@ -206,6 +213,51 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
log.info("TB Version Control request consumer stopped.");
}
private Void processMessage(VersionControlRequestCtx ctx, ToVersionControlServiceMsg msg) {
var lock = getRepoLock(ctx.getTenantId());
lock.lock();
try {
if (msg.hasClearRepositoryRequest()) {
handleClearRepositoryCommand(ctx);
} else {
if (msg.hasTestRepositoryRequest()) {
handleTestRepositoryCommand(ctx);
} else if (msg.hasInitRepositoryRequest()) {
handleInitRepositoryCommand(ctx);
} else {
var currentSettings = vcService.getRepositorySettings(ctx.getTenantId());
var newSettings = ctx.getSettings();
if (!newSettings.equals(currentSettings)) {
vcService.initRepository(ctx.getTenantId(), ctx.getSettings());
}
if (msg.hasCommitRequest()) {
handleCommitRequest(ctx, msg.getCommitRequest());
} else if (msg.hasListBranchesRequest()) {
vcService.fetch(ctx.getTenantId());
handleListBranches(ctx, msg.getListBranchesRequest());
} else if (msg.hasListEntitiesRequest()) {
vcService.fetch(ctx.getTenantId());
handleListEntities(ctx, msg.getListEntitiesRequest());
} else if (msg.hasListVersionRequest()) {
vcService.fetch(ctx.getTenantId());
handleListVersions(ctx, msg.getListVersionRequest());
} else if (msg.hasEntityContentRequest()) {
vcService.fetch(ctx.getTenantId());
handleEntityContentRequest(ctx, msg.getEntityContentRequest());
} else if (msg.hasEntitiesContentRequest()) {
vcService.fetch(ctx.getTenantId());
handleEntitiesContentRequest(ctx, msg.getEntitiesContentRequest());
}
}
}
} catch (Exception e) {
reply(ctx, Optional.of(e));
} finally {
lock.unlock();
}
return null;
}
private void handleEntitiesContentRequest(VersionControlRequestCtx ctx, EntitiesContentRequestMsg request) throws Exception {
var entityType = EntityType.valueOf(request.getEntityType());
String path = getRelativePath(entityType, null);
@ -273,6 +325,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
var tenantId = ctx.getTenantId();
UUID txId = UUID.fromString(request.getTxId());
if (request.hasPrepareMsg()) {
vcService.fetch(ctx.getTenantId());
prepareCommit(ctx, txId, request.getPrepareMsg());
} else if (request.hasAbortMsg()) {
PendingCommit current = pendingCommitMap.get(tenantId);
@ -415,4 +468,21 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private Lock getRepoLock(TenantId tenantId) {
return tenantRepoLocks.computeIfAbsent(tenantId, t -> new ReentrantLock(true));
}
private void logTaskExecution(VersionControlRequestCtx ctx, ListenableFuture<Void> future, long startTs) {
if (log.isTraceEnabled()) {
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object result) {
log.trace("[{}][{}] Task processing took: {}ms", ctx.getTenantId(), ctx.getRequestId(), (System.currentTimeMillis() - startTs));
}
@Override
public void onFailure(Throwable t) {
log.trace("[{}][{}] Task failed: ", ctx.getTenantId(), ctx.getRequestId(), t);
}
}, MoreExecutors.directExecutor());
}
}
}

View File

@ -35,19 +35,17 @@ import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@ -61,10 +59,6 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
@Value("${vc.git.repositories-folder:${java.io.tmpdir}/repositories}")
private String repositoriesFolder;
@Value("${vc.git.repos-poll-interval:60}")
private long reposPollInterval;
private ScheduledExecutorService scheduler;
private final Map<TenantId, GitRepository> repositories = new ConcurrentHashMap<>();
@PostConstruct
@ -72,24 +66,11 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
if (StringUtils.isEmpty(repositoriesFolder)) {
repositoriesFolder = defaultFolder;
}
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleWithFixedDelay(() -> {
repositories.forEach((tenantId, repository) -> {
try {
repository.fetch();
log.info("Fetching remote repository for tenant {}", tenantId);
} catch (Exception e) {
log.warn("Failed to fetch repository for tenant {}", tenantId, e);
}
});
}, reposPollInterval, reposPollInterval, TimeUnit.SECONDS);
}
@PreDestroy
public void stop() {
if (scheduler != null) {
scheduler.shutdownNow();
}
@Override
public Set<TenantId> getActiveRepositoryTenants() {
return new HashSet<>(repositories.keySet());
}
@Override
@ -151,6 +132,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
@SneakyThrows
@Override
public void cleanUp(PendingCommit commit) {
log.debug("[{}] Cleanup tenant repository started.", commit.getTenantId());
GitRepository repository = checkRepository(commit.getTenantId());
try {
repository.createAndCheckoutOrphanBranch(EntityId.NULL_UUID.toString());
@ -161,6 +143,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
}
repository.resetAndClean();
repository.deleteLocalBranchIfExists(commit.getWorkingBranch());
log.debug("[{}] Cleanup tenant repository completed.", commit.getTenantId());
}
@Override
@ -172,7 +155,9 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
public void fetch(TenantId tenantId) throws GitAPIException {
var repository = repositories.get(tenantId);
if (repository != null) {
log.debug("[{}] Fetching tenant repository.", tenantId);
repository.fetch();
log.debug("[{}] Fetched tenant repository.", tenantId);
}
}
@ -232,6 +217,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
@Override
public void initRepository(TenantId tenantId, EntitiesVersionControlSettings settings) throws Exception {
clearRepository(tenantId);
log.debug("[{}] Init tenant repository started.", tenantId);
Path repositoryDirectory = Path.of(repositoriesFolder, tenantId.getId().toString());
GitRepository repository;
if (Files.exists(repositoryDirectory)) {
@ -241,6 +227,7 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
Files.createDirectories(repositoryDirectory);
repository = GitRepository.clone(settings, repositoryDirectory.toFile());
repositories.put(tenantId, repository);
log.debug("[{}] Init tenant repository completed.", tenantId);
}
@Override
@ -253,8 +240,10 @@ public class DefaultGitRepositoryService implements GitRepositoryService {
public void clearRepository(TenantId tenantId) throws IOException {
GitRepository repository = repositories.get(tenantId);
if (repository != null) {
log.debug("[{}] Clear tenant repository started.", tenantId);
FileUtils.deleteDirectory(new File(repository.getDirectory()));
repositories.remove(tenantId);
log.debug("[{}] Clear tenant repository completed.", tenantId);
}
}

View File

@ -26,9 +26,12 @@ import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public interface GitRepositoryService {
Set<TenantId> getActiveRepositoryTenants();
void prepareCommit(PendingCommit pendingCommit);
PageData<EntityVersion> listVersions(TenantId tenantId, String branch, String path, PageLink pageLink) throws Exception;

View File

@ -41,6 +41,7 @@
<modules>
<module>tb</module>
<module>vc-executor</module>
<module>vc-executor-docker</module>
<module>js-executor</module>
<module>web-ui</module>
<module>tb-node</module>

View File

@ -0,0 +1,33 @@
#
# Copyright © 2016-2022 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM thingsboard/openjdk11
COPY start-tb-vc-executor.sh ${pkg.name}.deb /tmp/
RUN chmod a+x /tmp/*.sh \
&& mv /tmp/start-tb-vc-executor.sh /usr/bin
RUN yes | dpkg -i /tmp/${pkg.name}.deb
RUN rm /tmp/${pkg.name}.deb
RUN systemctl --no-reload disable --now ${pkg.name}.service > /dev/null 2>&1 || :
RUN chmod 555 ${pkg.installFolder}/bin/${pkg.name}.jar
USER ${pkg.user}
CMD ["start-tb-vc-executor.sh"]

View File

@ -0,0 +1,33 @@
#!/bin/bash
#
# Copyright © 2016-2022 The Thingsboard Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
CONF_FOLDER="/config"
jarfile=${pkg.installFolder}/bin/${pkg.name}.jar
configfile=${pkg.name}.conf
source "${CONF_FOLDER}/${configfile}"
export LOADER_PATH=/config,${LOADER_PATH}
echo "Starting '${project.name}' ..."
cd ${pkg.installFolder}/bin
exec java -cp ${jarfile} $JAVA_OPTS -Dloader.main=org.thingsboard.server.vc.ThingsboardVersionControlExecutorApplication \
-Dspring.jpa.hibernate.ddl-auto=none \
-Dlogging.config=/config/logback.xml \
org.springframework.boot.loader.PropertiesLauncher

View File

@ -0,0 +1,190 @@
<!--
Copyright © 2016-2022 The Thingsboard Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.thingsboard</groupId>
<version>3.4.0-SNAPSHOT</version>
<artifactId>msa</artifactId>
</parent>
<groupId>org.thingsboard.msa</groupId>
<artifactId>vc-executor-docker</artifactId>
<packaging>pom</packaging>
<name>ThingsBoard Version Control Executor Microservice</name>
<url>https://thingsboard.io</url>
<description>ThingsBoard Version Control Executor Microservice</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
<pkg.name>tb-vc-executor</pkg.name>
<docker.name>tb-vc-executor</docker.name>
<pkg.logFolder>/var/log/${pkg.name}</pkg.logFolder>
<pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder>
<docker.push-arm-amd-image.phase>pre-integration-test</docker.push-arm-amd-image.phase>
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.msa</groupId>
<artifactId>vc-executor</artifactId>
<version>${project.version}</version>
<classifier>deb</classifier>
<type>deb</type>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-tb-vc-executor-deb</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.thingsboard.msa</groupId>
<artifactId>vc-executor</artifactId>
<classifier>deb</classifier>
<type>deb</type>
<destFileName>${pkg.name}.deb</destFileName>
<outputDirectory>${project.build.directory}</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-docker-config</id>
<phase>process-resources</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<resources>
<resource>
<directory>docker</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<executions>
<execution>
<id>build-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
</goals>
<configuration>
<skip>${dockerfile.skip}</skip>
<repository>${docker.repo}/${docker.name}</repository>
<verbose>true</verbose>
<googleContainerRegistryEnabled>false</googleContainerRegistryEnabled>
<contextDirectory>${project.build.directory}</contextDirectory>
</configuration>
</execution>
<execution>
<id>tag-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>tag</goal>
</goals>
<configuration>
<skip>${dockerfile.skip}</skip>
<repository>${docker.repo}/${docker.name}</repository>
<tag>${project.version}</tag>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>push-docker-image</id>
<activation>
<property>
<name>push-docker-image</name>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<executions>
<execution>
<id>push-latest-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>push</goal>
</goals>
<configuration>
<tag>latest</tag>
<repository>${docker.repo}/${docker.name}</repository>
</configuration>
</execution>
<execution>
<id>push-version-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>push</goal>
</goals>
<configuration>
<tag>${project.version}</tag>
<repository>${docker.repo}/${docker.name}</repository>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<repositories>
<repository>
<id>jenkins</id>
<name>Jenkins Repository</name>
<url>https://repo.jenkins-ci.org/releases</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@ -38,7 +38,7 @@
<pkg.disabled>false</pkg.disabled>
<pkg.process-resources.phase>process-resources</pkg.process-resources.phase>
<pkg.package.phase>package</pkg.package.phase>
<pkg.name>tb-mqtt-transport</pkg.name>
<pkg.name>tb-vc-executor</pkg.name>
<pkg.copyInstallScripts>false</pkg.copyInstallScripts>
<pkg.win.dist>${project.build.directory}/windows</pkg.win.dist>
<pkg.implementationTitle>ThingsBoard Version Control Executor Service</pkg.implementationTitle>
@ -46,6 +46,10 @@
</properties>
<dependencies>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>queue</artifactId>
</dependency>
<dependency>
<groupId>org.thingsboard.common</groupId>
<artifactId>version-control</artifactId>

View File

@ -15,17 +15,17 @@ package org.thingsboard.server.vc; /**
*/
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.Arrays;
@SpringBootConfiguration
@SpringBootApplication
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.vc", "org.thingsboard.server.common", "org.thingsboard.server.service.sync.vc"})
@ComponentScan({"org.thingsboard.server", "org.thingsboard.server.common", "org.thingsboard.server.service.sync.vc"})
public class ThingsboardVersionControlExecutorApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";

View File

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.vc.service;
import org.springframework.stereotype.Service;
import org.thingsboard.server.queue.discovery.QueueRoutingInfo;
import org.thingsboard.server.queue.discovery.QueueRoutingInfoService;
import java.util.Collections;
import java.util.List;
@Service
public class VersionControlQueueRoutingInfoService implements QueueRoutingInfoService {
@Override
public List<QueueRoutingInfo> getAllQueuesRoutingInfo() {
return Collections.emptyList();
}
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.vc.service;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@Service
public class VersionControlTenantRoutingInfoService implements TenantRoutingInfoService {
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
//This dummy implementation is ok since Version Control service does not produce any rule engine messages.
return new TenantRoutingInfo(tenantId, false, false);
}
}

View File

@ -25,6 +25,7 @@
</encoder>
</appender>
<logger name="org.thingsboard.server.queue" level="INFO" />
<logger name="org.thingsboard.server" level="TRACE" />
<root level="INFO">

View File

@ -23,5 +23,163 @@ server:
# Server bind address (has no effect if web-environment is disabled).
address: "${HTTP_BIND_ADDRESS:0.0.0.0}"
# Server bind port (has no effect if web-environment is disabled).
port: "${HTTP_BIND_PORT:8080}"
port: "${HTTP_BIND_PORT:8086}"
# Zookeeper connection parameters. Used for service discovery.
zk:
# Enable/disable zookeeper discovery service.
enabled: "${ZOOKEEPER_ENABLED:true}"
# Zookeeper connect string
url: "${ZOOKEEPER_URL:localhost:2181}"
# Zookeeper retry interval in milliseconds
retry_interval_ms: "${ZOOKEEPER_RETRY_INTERVAL_MS:3000}"
# Zookeeper connection timeout in milliseconds
connection_timeout_ms: "${ZOOKEEPER_CONNECTION_TIMEOUT_MS:3000}"
# Zookeeper session timeout in milliseconds
session_timeout_ms: "${ZOOKEEPER_SESSION_TIMEOUT_MS:3000}"
# Name of the directory in zookeeper 'filesystem'
zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
queue:
type: "${TB_QUEUE_TYPE:kafka}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ)
in_memory:
stats:
# For debug lvl
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
kafka:
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
retries: "${TB_KAFKA_RETRIES:1}"
compression.type: "${TB_KAFKA_COMPRESSION_TYPE:none}" # none or gzip
batch.size: "${TB_KAFKA_BATCH_SIZE:16384}"
linger.ms: "${TB_KAFKA_LINGER_MS:1}"
max.request.size: "${TB_KAFKA_MAX_REQUEST_SIZE:1048576}"
max.in.flight.requests.per.connection: "${TB_KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:5}"
buffer.memory: "${TB_BUFFER_MEMORY:33554432}"
replication_factor: "${TB_QUEUE_KAFKA_REPLICATION_FACTOR:1}"
max_poll_interval_ms: "${TB_QUEUE_KAFKA_MAX_POLL_INTERVAL_MS:300000}"
max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
# Key-value properties for Kafka consumer per specific topic, e.g. tb_ota_package is a topic name for ota, tb_rule_engine.sq is a topic name for default SequentialByOriginator queue.
# Check TB_QUEUE_CORE_OTA_TOPIC and TB_QUEUE_RE_SQ_TOPIC params
consumer-properties-per-topic:
tb_ota_package:
- key: max.poll.records
value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}"
# tb_rule_engine.sq:
# - key: max.poll.records
# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
- key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
- key: "session.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
value: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds)
topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
ota-updates: "${TB_QUEUE_KAFKA_OTA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
consumer-stats:
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
kafka-response-timeout-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS:1000}"
aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"
threads_per_topic: "${TB_QUEUE_AWS_SQS_THREADS_PER_TOPIC:1}"
queue-properties:
rule-engine: "${TB_QUEUE_AWS_SQS_RE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
core: "${TB_QUEUE_AWS_SQS_CORE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
transport-api: "${TB_QUEUE_AWS_SQS_TA_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
notifications: "${TB_QUEUE_AWS_SQS_NOTIFICATIONS_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
js-executor: "${TB_QUEUE_AWS_SQS_JE_QUEUE_PROPERTIES:VisibilityTimeout:30;MaximumMessageSize:262144;MessageRetentionPeriod:604800}"
# VisibilityTimeout in seconds;MaximumMessageSize in bytes;MessageRetentionPeriod in seconds
pubsub:
project_id: "${TB_QUEUE_PUBSUB_PROJECT_ID:YOUR_PROJECT_ID}"
service_account: "${TB_QUEUE_PUBSUB_SERVICE_ACCOUNT:YOUR_SERVICE_ACCOUNT}"
max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes
max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
transport-api: "${TB_QUEUE_PUBSUB_TA_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
notifications: "${TB_QUEUE_PUBSUB_NOTIFICATIONS_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
js-executor: "${TB_QUEUE_PUBSUB_JE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"
service_bus:
namespace_name: "${TB_QUEUE_SERVICE_BUS_NAMESPACE_NAME:YOUR_NAMESPACE_NAME}"
sas_key_name: "${TB_QUEUE_SERVICE_BUS_SAS_KEY_NAME:YOUR_SAS_KEY_NAME}"
sas_key: "${TB_QUEUE_SERVICE_BUS_SAS_KEY:YOUR_SAS_KEY}"
max_messages: "${TB_QUEUE_SERVICE_BUS_MAX_MESSAGES:1000}"
queue-properties:
rule-engine: "${TB_QUEUE_SERVICE_BUS_RE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
core: "${TB_QUEUE_SERVICE_BUS_CORE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
transport-api: "${TB_QUEUE_SERVICE_BUS_TA_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
notifications: "${TB_QUEUE_SERVICE_BUS_NOTIFICATIONS_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
js-executor: "${TB_QUEUE_SERVICE_BUS_JE_QUEUE_PROPERTIES:lockDurationInSec:30;maxSizeInMb:1024;messageTimeToLiveInSec:604800}"
rabbitmq:
exchange_name: "${TB_QUEUE_RABBIT_MQ_EXCHANGE_NAME:}"
host: "${TB_QUEUE_RABBIT_MQ_HOST:localhost}"
port: "${TB_QUEUE_RABBIT_MQ_PORT:5672}"
virtual_host: "${TB_QUEUE_RABBIT_MQ_VIRTUAL_HOST:/}"
username: "${TB_QUEUE_RABBIT_MQ_USERNAME:YOUR_USERNAME}"
password: "${TB_QUEUE_RABBIT_MQ_PASSWORD:YOUR_PASSWORD}"
automatic_recovery_enabled: "${TB_QUEUE_RABBIT_MQ_AUTOMATIC_RECOVERY_ENABLED:false}"
connection_timeout: "${TB_QUEUE_RABBIT_MQ_CONNECTION_TIMEOUT:60000}"
handshake_timeout: "${TB_QUEUE_RABBIT_MQ_HANDSHAKE_TIMEOUT:10000}"
queue-properties:
rule-engine: "${TB_QUEUE_RABBIT_MQ_RE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
core: "${TB_QUEUE_RABBIT_MQ_CORE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
transport-api: "${TB_QUEUE_RABBIT_MQ_TA_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
js-executor: "${TB_QUEUE_RABBIT_MQ_JE_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
partitions:
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" # murmur3_32, murmur3_128 or sha256
core:
topic: "${TB_QUEUE_CORE_TOPIC:tb_core}"
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
ota:
topic: "${TB_QUEUE_CORE_OTA_TOPIC:tb_ota_package}"
pack-interval-ms: "${TB_QUEUE_CORE_OTA_PACK_INTERVAL_MS:60000}"
pack-size: "${TB_QUEUE_CORE_OTA_PACK_SIZE:100}"
usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}"
stats:
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:60000}"
vc:
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
partitions: "${TB_QUEUE_VC_PARTITIONS:10}"
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
vc:
# Pool size for handling export tasks
thread_pool_size: "${TB_VC_POOL_SIZE:2}"
git:
# Pool size for handling the git IO operations
io_pool_size: "${TB_VC_GIT_POOL_SIZE:3}"
repositories-folder: "${TB_VC_GIT_REPOSITORIES_FOLDER:${java.io.tmpdir}/repositories}"
metrics:
# Enable/disable actuator metrics.
enabled: "${METRICS_ENABLED:false}"
timer:
# Metrics percentiles returned by actuator for timer metrics. List of double values (divided by ,).
percentiles: "${METRICS_TIMER_PERCENTILES:0.5}"
service:
type: "${TB_SERVICE_TYPE:tb-vc-executor}"
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"