diff --git a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java index 215eff736a..a06ef0fe05 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/PsqlTsDatabaseUpgradeService.java @@ -46,7 +46,7 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe private static final String CREATE_PARTITION_TS_KV_TABLE = "create_partition_ts_kv_table()"; private static final String CREATE_NEW_TS_KV_LATEST_TABLE = "create_new_ts_kv_latest_table()"; - private static final String CREATE_PARTITIONS = "create_partitions()"; + private static final String CREATE_PARTITIONS = "create_partitions(IN partition_type varchar)"; private static final String CREATE_TS_KV_DICTIONARY_TABLE = "create_ts_kv_dictionary_table()"; private static final String INSERT_INTO_DICTIONARY = "insert_into_dictionary()"; private static final String INSERT_INTO_TS_KV = "insert_into_ts_kv()"; @@ -108,7 +108,6 @@ public class PsqlTsDatabaseUpgradeService extends AbstractSqlTsDatabaseUpgradeSe executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV); executeQuery(conn, DROP_PROCEDURE_CREATE_NEW_TS_KV_LATEST_TABLE); executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); - executeQuery(conn, DROP_PROCEDURE_INSERT_INTO_TS_KV_LATEST); executeQuery(conn, DROP_FUNCTION_GET_PARTITION_DATA); executeQuery(conn, "ALTER TABLE ts_kv ADD COLUMN IF NOT EXISTS json_v json;"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 541337579d..6b1c051eeb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -81,14 +81,24 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void subscribe() { - partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); - subscribed = false; + consumerLock.lock(); + try { + partitions = Collections.singleton(new TopicPartitionInfo(topic, null, null, true)); + subscribed = false; + } finally { + consumerLock.unlock(); + } } @Override public void subscribe(Set partitions) { - this.partitions = partitions; - subscribed = false; + consumerLock.lock(); + try { + this.partitions = partitions; + subscribed = false; + } finally { + consumerLock.unlock(); + } } @Override @@ -100,13 +110,11 @@ public class TbKafkaConsumerTemplate implements TbQueueCon log.debug("Failed to await subscription", e); } } else { + consumerLock.lock(); try { - consumerLock.lock(); - if (!subscribed) { List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); topicNames.forEach(admin::createTopicIfNotExists); - consumer.unsubscribe(); consumer.subscribe(topicNames); subscribed = true; } @@ -132,8 +140,8 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void commit() { + consumerLock.lock(); try { - consumerLock.lock(); consumer.commitAsync(); } finally { consumerLock.unlock(); @@ -142,8 +150,8 @@ public class TbKafkaConsumerTemplate implements TbQueueCon @Override public void unsubscribe() { + consumerLock.lock(); try { - consumerLock.lock(); if (consumer != null) { consumer.unsubscribe(); consumer.close(); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2f0d0fb3b3..9061f3e2de 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -28,7 +28,7 @@ services: ZOO_SERVERS: server.1=zookeeper:2888:3888;zookeeper:2181 kafka: restart: always - image: "wurstmeister/kafka:2.12-2.2.1" + image: "wurstmeister/kafka:2.12-2.3.0" ports: - "9092:9092" env_file: