diff --git a/Dockerfile.cassandra b/Dockerfile.cassandra deleted file mode 100644 index b68b78a665..0000000000 --- a/Dockerfile.cassandra +++ /dev/null @@ -1,13 +0,0 @@ -FROM cassandra - -WORKDIR /opt/cassandra - -COPY dao/src/main/resources/cassandra/schema.cql /opt/cassandra - -COPY entrypoint-with-db-init.sh /opt/cassandra/entrypoint-with-db-init.sh - -RUN chmod +x /opt/cassandra/entrypoint-with-db-init.sh - -ENTRYPOINT ["/opt/cassandra/entrypoint-with-db-init.sh"] - -CMD ["cassandra", "-f"] diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java index f856ed6a10..6e47e3550d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/ValidationCallback.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.actors.plugin; -import com.hazelcast.util.function.Consumer; +import java.util.function.Consumer; import org.thingsboard.server.extensions.api.exception.AccessDeniedException; import org.thingsboard.server.extensions.api.exception.EntityNotFoundException; import org.thingsboard.server.extensions.api.exception.InternalErrorException; diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java index fc117f2e72..14bb636830 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/BasicRpcSessionListener.java @@ -17,7 +17,6 @@ package org.thingsboard.server.actors.rpc; import akka.actor.ActorRef; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.service.ActorService; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import org.thingsboard.server.service.cluster.rpc.GrpcSession; @@ -58,7 +57,7 @@ public class BasicRpcSessionListener implements GrpcSessionListener { log.trace("{} Service [{}] received session actor msg {}", getType(session), session.getRemoteServer(), clusterMessage); - service.onRecievedMsg(clusterMessage); + service.onReceivedMsg(session.getRemoteServer(), clusterMessage); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java index ab21e5d475..c5c6553612 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcManagerActor.java @@ -89,9 +89,9 @@ public class RpcManagerActor extends ContextAwareActor { } private void onMsg(ClusterAPIProtos.ClusterMessage msg) { - if (msg.hasServerAdresss()) { - ServerAddress address = new ServerAddress(msg.getServerAdresss().getHost(), - msg.getServerAdresss().getPort()); + if (msg.hasServerAddress()) { + ServerAddress address = new ServerAddress(msg.getServerAddress().getHost(), + msg.getServerAddress().getPort()); SessionActorInfo session = sessionActors.get(address); if (session != null) { log.debug("{} Forwarding msg to session actor", address); @@ -102,7 +102,7 @@ public class RpcManagerActor extends ContextAwareActor { if (queue == null) { queue = new LinkedList<>(); pendingMsgs.put(new ServerAddress( - msg.getServerAdresss().getHost(), msg.getServerAdresss().getPort()), queue); + msg.getServerAddress().getHost(), msg.getServerAddress().getPort()), queue); } queue.add(msg); } diff --git a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java index 9d0f3e8824..c9cf8696e5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/rpc/RpcSessionActor.java @@ -78,7 +78,7 @@ public class RpcSessionActor extends ContextAwareActor { private void initSession(RpcSessionCreateRequestMsg msg) { log.info("[{}] Initializing session", context().self()); ServerAddress remoteServer = msg.getRemoteAddress(); - listener = new BasicRpcSessionListener(systemContext, context().parent(), context().self()); + listener = new BasicRpcSessionListener(systemContext.getActorService(), context().parent(), context().self()); if (msg.getRemoteAddress() == null) { // Server session session = new GrpcSession(listener); @@ -119,7 +119,7 @@ public class RpcSessionActor extends ContextAwareActor { private ClusterAPIProtos.ClusterMessage toConnectMsg() { ServerAddress instance = systemContext.getDiscoveryService().getCurrentServer().getServerAddress(); - return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAdresss( + return ClusterAPIProtos.ClusterMessage.newBuilder().setMessageType(CONNECT_RPC_MESSAGE).setServerAddress( ClusterAPIProtos.ServerAddress.newBuilder().setHost(instance.getHost()) .setPort(instance.getPort()).build()).build(); } diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 503ec705ff..56f3228afe 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -56,8 +56,6 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE; -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE_VALUE; -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE; @Service @Slf4j @@ -211,8 +209,8 @@ public class DefaultActorService implements ActorService { } @Override - public void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg) { - ServerAddress serverAddress = new ServerAddress(msg.getServerAddress().getHost(), msg.getServerAddress().getPort()); + public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) { + ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort()); switch (msg.getMessageType()) { case CLUSTER_ACTOR_MESSAGE: java.util.Optional decodedMsg = actorContext.getEncodingService() diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java index 0e0175b105..fab51a5eb3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java @@ -87,7 +87,7 @@ abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgP return address; } - protected Optional forwardToAppActorIfAdressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional oldAddress) { + protected Optional forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional oldAddress) { Optional newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId()); if (!newAddress.equals(oldAddress)) { diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java index 7f520b51e1..6f71d79e06 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java @@ -73,7 +73,7 @@ class SyncMsgProcessor extends AbstractSessionActorMsgProcessor { @Override public void processClusterEvent(ActorContext context, ClusterEventMsg msg) { if (pendingResponse) { - Optional newTargetServer = forwardToAppActorIfAdressChanged(context, pendingMsg, currentTargetServer); + Optional newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer); if (logger.isDebugEnabled()) { if (!newTargetServer.equals(currentTargetServer)) { if (newTargetServer.isPresent()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java index fbda1196e4..6002b0e67e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/discovery/ZkDiscoveryService.java @@ -32,9 +32,11 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.springframework.util.Assert; import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.utils.MiscUtils; import javax.annotation.PostConstruct; @@ -68,6 +70,10 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi @Autowired private ServerInstanceService serverInstance; + @Autowired + @Lazy + private TelemetrySubscriptionService tsSubService; + private final List listeners = new CopyOnWriteArrayList<>(); private CuratorFramework client; @@ -196,12 +202,14 @@ public class ZkDiscoveryService implements DiscoveryService, PathChildrenCacheLi log.info("Processing [{}] event for [{}:{}]", pathChildrenCacheEvent.getType(), instance.getHost(), instance.getPort()); switch (pathChildrenCacheEvent.getType()) { case CHILD_ADDED: + tsSubService.onClusterUpdate(); listeners.forEach(listener -> listener.onServerAdded(instance)); break; case CHILD_UPDATED: listeners.forEach(listener -> listener.onServerUpdated(instance)); break; case CHILD_REMOVED: + tsSubService.onClusterUpdate(); listeners.forEach(listener -> listener.onServerRemoved(instance)); break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java index cb8808e733..b73a6f5161 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java index e56fb7534d..7216c4319f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/GrpcSession.java @@ -61,7 +61,7 @@ public final class GrpcSession implements Closeable { public void onNext(ClusterAPIProtos.ClusterMessage clusterMessage) { if (!connected && clusterMessage.getMessageType() == ClusterAPIProtos.MessageType.CONNECT_RPC_MESSAGE) { connected = true; - ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAdresss().getHost(), clusterMessage.getServerAdresss().getPort()); + ServerAddress rpcAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort()); remoteServer = new ServerAddress(rpcAddress.getHost(), rpcAddress.getPort()); listener.onConnected(GrpcSession.this); } diff --git a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java index 88d17e07f6..33f3847ddc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java +++ b/application/src/main/java/org/thingsboard/server/service/cluster/rpc/RpcMsgListener.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.cluster.rpc; import org.thingsboard.server.actors.rpc.RpcBroadcastMsg; import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg; +import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.gen.cluster.ClusterAPIProtos; /** @@ -24,7 +25,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos; */ public interface RpcMsgListener { - void onRecievedMsg(ClusterAPIProtos.ClusterMessage msg); + void onReceivedMsg(ServerAddress remoteServer, ClusterAPIProtos.ClusterMessage msg); void onSendMsg(ClusterAPIProtos.ClusterMessage msg); void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg); void onBroadcastMsg(RpcBroadcastMsg msg); diff --git a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java index f6a365e4a0..2cf9299743 100644 --- a/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java +++ b/application/src/main/java/org/thingsboard/server/service/encoding/ProtoWithJavaSerializationDecodingEncodingService.java @@ -25,7 +25,7 @@ import org.thingsboard.server.gen.cluster.ClusterAPIProtos; import java.util.Optional; -import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_NETWORK_SERVER_DATA_MESSAGE; +import static org.thingsboard.server.gen.cluster.ClusterAPIProtos.MessageType.CLUSTER_ACTOR_MESSAGE; @Slf4j @@ -55,12 +55,12 @@ public class ProtoWithJavaSerializationDecodingEncodingService implements DataDe TbActorMsg msg) { return ClusterAPIProtos.ClusterMessage .newBuilder() - .setServerAdresss(ClusterAPIProtos.ServerAddress + .setServerAddress(ClusterAPIProtos.ServerAddress .newBuilder() .setHost(serverAddress.getHost()) .setPort(serverAddress.getPort()) .build()) - .setMessageType(CLUSTER_NETWORK_SERVER_DATA_MESSAGE) + .setMessageType(CLUSTER_ACTOR_MESSAGE) .setPayload(ByteString.copyFrom(encode(msg))).build(); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index e99bf79a09..f3cd32cbfc 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 57f3876366..3ea0ceefab 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -21,7 +21,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.hazelcast.util.function.Consumer; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ad29dd2b59..45d4b47cb2 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -107,7 +107,7 @@ mqtt: # CoAP server parameters coap: # Enable/disable coap transport protocol. - enabled: "${COAP_ENABLED:true}" + enabled: "${COAP_ENABLED:false}" bind_address: "${COAP_BIND_ADDRESS:0.0.0.0}" bind_port: "${COAP_BIND_PORT:5683}" adaptor: "${COAP_ADAPTOR_NAME:JsonCoapAdaptor}" diff --git a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java index 3765246c91..4b71cae015 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/DbConfigurationTestRule.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.server.mqtt; import org.junit.rules.TestRule; diff --git a/base-docker-compose.yml b/base-docker-compose.yml index 7047a2b15b..3aa5bb5959 100644 --- a/base-docker-compose.yml +++ b/base-docker-compose.yml @@ -1,3 +1,19 @@ +# +# Copyright © 2016-2018 The Thingsboard Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + version: '3.3' services: zookeeper: @@ -7,20 +23,15 @@ services: ports: - "2181:2181" - cassandra-tb: - build: - context: . - dockerfile: Dockerfile.cassandra - image: cassandra + cassandra: + image: cassandra:3.11.2 networks: - core ports: - "7199:7199" - "9160:9160" - "9042:9042" - volumes: - - /cassandra:/var/lib/cassandra - - ./db-schema:/docker-entrypoint-initdb.d/ + redis: image: redis:4.0 networks: diff --git a/dao/pom.xml b/dao/pom.xml index 1d2d962ba0..75ca13c667 100644 --- a/dao/pom.xml +++ b/dao/pom.xml @@ -152,22 +152,10 @@ org.apache.curator curator-x-discovery - - com.hazelcast - hazelcast-zookeeper - - - com.hazelcast - hazelcast - com.github.ben-manes.caffeine caffeine - - com.hazelcast - hazelcast-spring - org.springframework.boot spring-boot-autoconfigure diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index c0ca37a715..f84cc1d030 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -222,15 +222,14 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ public ListenableFuture findAlarmInfoByIdAsync(AlarmId alarmId) { log.trace("Executing findAlarmInfoByIdAsync [{}]", alarmId); validateId(alarmId, "Incorrect alarmId " + alarmId); - return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()), - (AsyncFunction) alarm1 -> { - AlarmInfo alarmInfo = new AlarmInfo(alarm1); + return Futures.transformAsync(alarmDao.findAlarmByIdAsync(alarmId.getId()), + a -> { + AlarmInfo alarmInfo = new AlarmInfo(a); return Futures.transform( - entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function) - originatorName -> { - alarmInfo.setOriginatorName(originatorName); - return alarmInfo; - } + entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> { + alarmInfo.setOriginatorName(originatorName); + return alarmInfo; + } ); }); } @@ -239,18 +238,17 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ public ListenableFuture> findAlarms(AlarmQuery query) { ListenableFuture> alarms = alarmDao.findAlarms(query); if (query.getFetchOriginator() != null && query.getFetchOriginator().booleanValue()) { - alarms = Futures.transform(alarms, (AsyncFunction, List>) input -> { + alarms = Futures.transformAsync(alarms, input -> { List> alarmFutures = new ArrayList<>(input.size()); for (AlarmInfo alarmInfo : input) { alarmFutures.add(Futures.transform( - entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function) - originatorName -> { - if (originatorName == null) { - originatorName = "Deleted"; - } - alarmInfo.setOriginatorName(originatorName); - return alarmInfo; - } + entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), originatorName -> { + if (originatorName == null) { + originatorName = "Deleted"; + } + alarmInfo.setOriginatorName(originatorName); + return alarmInfo; + } )); } return Futures.successfulAsList(alarmFutures); diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java index 1233c7f753..6785f2e3b7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java @@ -102,12 +102,12 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink()); - return Futures.transform(relations, (AsyncFunction, List>) input -> { + return Futures.transformAsync(relations, input -> { List> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(relation.getTo().getId()), - (Function) AlarmInfo::new)); + AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index dcd9523975..7bb67f04bb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -194,10 +194,10 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ @Override public ListenableFuture> findAssetsByQuery(AssetSearchQuery query) { ListenableFuture> relations = relationService.findByQuery(query.toEntitySearchQuery()); - ListenableFuture> assets = Futures.transform(relations, (AsyncFunction, List>) relations1 -> { + ListenableFuture> assets = Futures.transformAsync(relations, r -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List> futures = new ArrayList<>(); - for (EntityRelation relation : relations1) { + for (EntityRelation relation : r) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == EntityType.ASSET) { futures.add(findAssetByIdAsync(new AssetId(entityId.getId()))); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java index 2e56416351..67b619492b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cassandra/AbstractCassandraCluster.java @@ -18,8 +18,12 @@ package org.thingsboard.server.dao.cassandra; import com.datastax.driver.core.*; import com.datastax.driver.core.ProtocolOptions.Compression; +import com.datastax.driver.mapping.DefaultPropertyMapper; import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingConfiguration; import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.PropertyAccessStrategy; +import com.datastax.driver.mapping.PropertyMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -145,7 +149,10 @@ public abstract class AbstractCassandraCluster { } else { session = cluster.connect(); } - mappingManager = new MappingManager(session); + DefaultPropertyMapper propertyMapper = new DefaultPropertyMapper(); + propertyMapper.setPropertyAccessStrategy(PropertyAccessStrategy.FIELDS); + MappingConfiguration configuration = MappingConfiguration.builder().withPropertyMapper(propertyMapper).build(); + mappingManager = new MappingManager(session, configuration); break; } catch (Exception e) { log.warn("Failed to initialize cassandra cluster due to {}. Will retry in {} ms", e.getMessage(), initRetryInterval); diff --git a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java index 8091b2a1fe..70afed53be 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/dashboard/CassandraDashboardInfoDao.java @@ -77,7 +77,7 @@ public class CassandraDashboardInfoDao extends CassandraAbstractSearchTextDao> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink); - return Futures.transform(relations, (AsyncFunction, List>) input -> { + return Futures.transformAsync(relations, input -> { List> dashboardFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { dashboardFutures.add(findByIdAsync(relation.getTo().getId())); diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 9120619a74..0d19ac1e0b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -227,10 +227,10 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe @Override public ListenableFuture> findDevicesByQuery(DeviceSearchQuery query) { ListenableFuture> relations = relationService.findByQuery(query.toEntitySearchQuery()); - ListenableFuture> devices = Futures.transform(relations, (AsyncFunction, List>) relations1 -> { + ListenableFuture> devices = Futures.transformAsync(relations, r -> { EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); List> futures = new ArrayList<>(); - for (EntityRelation relation : relations1) { + for (EntityRelation relation : r) { EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); if (entityId.getEntityType() == EntityType.DEVICE) { futures.add(findDeviceByIdAsync(new DeviceId(entityId.getId()))); diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java index d2505632d7..d4aef86522 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -36,14 +36,14 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { private final ListenableFuture rateLimitFuture; public RateLimitedResultSetFuture(Session session, AsyncRateLimiter rateLimiter, Statement statement) { - this.rateLimitFuture = Futures.withFallback(rateLimiter.acquireAsync(), t -> { + this.rateLimitFuture = Futures.catchingAsync(rateLimiter.acquireAsync(), Throwable.class, t -> { if (!(t instanceof BufferLimitException)) { rateLimiter.release(); } return Futures.immediateFailedFuture(t); }); this.originalFuture = Futures.transform(rateLimitFuture, - (Function) i -> executeAsyncWithRelease(rateLimiter, session, statement)); + i -> executeAsyncWithRelease(rateLimiter, session, statement)); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 836bd3d314..9fc797c011 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -227,8 +227,8 @@ public class BaseRelationService implements RelationService { inboundRelationsListTo.add(relationDao.findAllByTo(entity, typeGroup)); } ListenableFuture>> inboundRelationsTo = Futures.allAsList(inboundRelationsListTo); - ListenableFuture> inboundDeletions = Futures.transform(inboundRelationsTo, - (AsyncFunction>, List>) relations -> { + ListenableFuture> inboundDeletions = Futures.transformAsync(inboundRelationsTo, + relations -> { List> results = getListenableFutures(relations, cache, true); return Futures.allAsList(results); }); @@ -240,7 +240,7 @@ public class BaseRelationService implements RelationService { inboundRelationsListFrom.add(relationDao.findAllByTo(entity, typeGroup)); } ListenableFuture>> inboundRelationsFrom = Futures.allAsList(inboundRelationsListFrom); - Futures.transform(inboundRelationsFrom, (AsyncFunction>, List>) relations -> { + Futures.transformAsync(inboundRelationsFrom, relations -> { List> results = getListenableFutures(relations, cache, false); return Futures.allAsList(results); }); @@ -252,7 +252,7 @@ public class BaseRelationService implements RelationService { private List> getListenableFutures(List> relations, Cache cache, boolean isRemove) { List> results = new ArrayList<>(); for (List relationList : relations) { - relationList.stream().forEach(relation -> { + relationList.forEach(relation -> { checkFromDeleteAsync(cache, results, relation, isRemove); }); } @@ -325,17 +325,16 @@ public class BaseRelationService implements RelationService { validate(from); validateTypeGroup(typeGroup); ListenableFuture> relations = relationDao.findAllByFrom(from, typeGroup); - ListenableFuture> relationsInfo = Futures.transform(relations, - (AsyncFunction, List>) relations1 -> { + return Futures.transformAsync(relations, + relations1 -> { List> futures = new ArrayList<>(); - relations1.stream().forEach(relation -> + relations1.forEach(relation -> futures.add(fetchRelationInfoAsync(relation, - relation2 -> relation2.getTo(), - (EntityRelationInfo relationInfo, String entityName) -> relationInfo.setToName(entityName))) + EntityRelation::getTo, + EntityRelationInfo::setToName)) ); return Futures.successfulAsList(futures); }); - return relationsInfo; } @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup}") @@ -381,8 +380,8 @@ public class BaseRelationService implements RelationService { validate(to); validateTypeGroup(typeGroup); ListenableFuture> relations = relationDao.findAllByTo(to, typeGroup); - ListenableFuture> relationsInfo = Futures.transform(relations, - (AsyncFunction, List>) relations1 -> { + return Futures.transformAsync(relations, + relations1 -> { List> futures = new ArrayList<>(); relations1.stream().forEach(relation -> futures.add(fetchRelationInfoAsync(relation, @@ -391,7 +390,6 @@ public class BaseRelationService implements RelationService { ); return Futures.successfulAsList(futures); }); - return relationsInfo; } private ListenableFuture fetchRelationInfoAsync(EntityRelation relation, @@ -463,8 +461,8 @@ public class BaseRelationService implements RelationService { log.trace("Executing findInfoByQuery [{}]", query); ListenableFuture> relations = findByQuery(query); EntitySearchDirection direction = query.getParameters().getDirection(); - ListenableFuture> relationsInfo = Futures.transform(relations, - (AsyncFunction, List>) relations1 -> { + return Futures.transformAsync(relations, + relations1 -> { List> futures = new ArrayList<>(); relations1.stream().forEach(relation -> futures.add(fetchRelationInfoAsync(relation, @@ -479,7 +477,6 @@ public class BaseRelationService implements RelationService { ); return Futures.successfulAsList(futures); }); - return relationsInfo; } protected void validate(EntityRelation relation) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java index 0d64d5c038..e092a47617 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java @@ -102,12 +102,12 @@ public class JpaAlarmDao extends JpaAbstractDao implements A } String relationType = BaseAlarmService.ALARM_RELATION_PREFIX + searchStatusName; ListenableFuture> relations = relationDao.findRelations(affectedEntity, relationType, RelationTypeGroup.ALARM, EntityType.ALARM, query.getPageLink()); - return Futures.transform(relations, (AsyncFunction, List>) input -> { + return Futures.transformAsync(relations, input -> { List> alarmFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(relation.getTo().getId()), - (Function) AlarmInfo::new)); + AlarmInfo::new)); } return Futures.successfulAsList(alarmFutures); }); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java index 4d8d0b2ff6..cc64e80af6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java @@ -86,7 +86,7 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao> relations = relationDao.findRelations(new CustomerId(customerId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.DASHBOARD, EntityType.DASHBOARD, pageLink); - return Futures.transform(relations, (AsyncFunction, List>) input -> { + return Futures.transformAsync(relations, input -> { List> dashboardFutures = new ArrayList<>(input.size()); for (EntityRelation relation : input) { dashboardFutures.add(findByIdAsync(relation.getTo().getId())); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index cda4b1669b..eb1ef52f5b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -217,7 +217,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem ListenableFuture> partitionsListFuture = Futures.transform(partitionsFuture, getPartitionsArrayFunction(), readResultsProcessingExecutor); - ListenableFuture> aggregationChunks = Futures.transform(partitionsListFuture, + ListenableFuture> aggregationChunks = Futures.transformAsync(partitionsListFuture, getFetchChunksAsyncFunction(entityId, key, aggregation, startTs, endTs), readResultsProcessingExecutor); return Futures.transform(aggregationChunks, new AggregatePartitionsFunction(aggregation, key, ts), readResultsProcessingExecutor); diff --git a/entrypoint-with-db-init.sh b/entrypoint-with-db-init.sh deleted file mode 100644 index 3fb3bce965..0000000000 --- a/entrypoint-with-db-init.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash - -if [[ $1 = 'cassandra' ]]; then - - until cqlsh -f/opt/cassandra/schema.cql; do - echo "cqlsh: Cassandra is unavailable - retrying" - sleep 2 - done & - -fi - -exec /docker-entrypoint.sh "$@" diff --git a/pom.xml b/pom.xml index 297512fc9a..387afbd212 100755 --- a/pom.xml +++ b/pom.xml @@ -41,10 +41,10 @@ 1.2.3 1.9.5 0.10 - 3.0.7 + 3.5.0 3.0.0.1 1.2.7 - 18.0 + 20.0 2.6.1 3.4 1.5.0 @@ -61,15 +61,13 @@ 1.4.3 2.11.0 3.0.2 - 1.0.0 + 1.12.0 1.16.18 1.1.0 4.1.22.Final 1.5.0 3.6.5 0.9.0.0 - 3.6.6 - 3.6.1 2.19.1 3.0.2 2.6.1 @@ -760,26 +758,11 @@ org.eclipse.paho.client.mqttv3 ${paho.client.version} - - com.hazelcast - hazelcast-spring - ${hazelcast.version} - org.apache.curator curator-x-discovery ${curator.version} - - com.hazelcast - hazelcast-zookeeper - ${hazelcast-zookeeper.version} - - - com.hazelcast - hazelcast - ${hazelcast.version} - io.springfox springfox-swagger-ui diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java index d223f4dd4a..ed54c624f4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java @@ -56,7 +56,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode processAlarm(TbContext ctx, TbMsg msg) { ListenableFuture latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType()); - return Futures.transform(latest, (AsyncFunction) a -> { + return Futures.transformAsync(latest, a -> { if (a != null && !a.getStatus().isCleared()) { return clearAlarm(ctx, msg, a); } @@ -66,9 +66,9 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { ListenableFuture asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails()); - return Futures.transform(asyncDetails, (AsyncFunction) details -> { + return Futures.transformAsync(asyncDetails, details -> { ListenableFuture clearFuture = ctx.getAlarmService().clearAlarm(alarm.getId(), details, System.currentTimeMillis()); - return Futures.transform(clearFuture, (AsyncFunction) cleared -> { + return Futures.transformAsync(clearFuture, cleared -> { if (cleared && details != null) { alarm.setDetails(details); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java index 5c2109b399..dcf90684b5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateAlarmNode.java @@ -58,7 +58,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode processAlarm(TbContext ctx, TbMsg msg) { ListenableFuture latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType()); - return Futures.transform(latest, (AsyncFunction) a -> { + return Futures.transformAsync(latest, a -> { if (a == null || a.getStatus().isCleared()) { return createNewAlarm(ctx, msg); } else { @@ -70,10 +70,10 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode createNewAlarm(TbContext ctx, TbMsg msg) { ListenableFuture asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null), - (Function) details -> buildAlarm(msg, details, ctx.getTenantId())); + details -> buildAlarm(msg, details, ctx.getTenantId())); ListenableFuture asyncCreated = Futures.transform(asyncAlarm, - (Function) alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor()); - return Futures.transform(asyncCreated, (Function) alarm -> new AlarmResult(true, false, false, alarm)); + alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor()); + return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm)); } private ListenableFuture updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) { @@ -85,7 +85,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode) a -> new AlarmResult(false, true, false, a)); + return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a)); } private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java index 73e194539a..be72833a43 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java @@ -43,8 +43,7 @@ public class EntitiesCustomerIdAsyncLoader { } private static ListenableFuture getCustomerAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { - return in != null ? Futures.immediateFuture(in.getCustomerId()) - : Futures.immediateFuture(null);}); + return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId()) + : Futures.immediateFuture(null)); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java index 8a09504e25..9e3a639d5e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java @@ -39,9 +39,8 @@ public class EntitiesRelatedDeviceIdAsyncLoader { ListenableFuture> asyncDevices = deviceService.findDevicesByQuery(query); - return Futures.transform(asyncDevices, (AsyncFunction, DeviceId>) - d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId()) - : Futures.immediateFuture(null)); + return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId()) + : Futures.immediateFuture(null)); } private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java index 55be558dd7..f4de8fc79b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java @@ -38,13 +38,11 @@ public class EntitiesRelatedEntityIdAsyncLoader { EntityRelationsQuery query = buildQuery(originator, relationsQuery); ListenableFuture> asyncRelation = relationService.findByQuery(query); if (relationsQuery.getDirection() == EntitySearchDirection.FROM) { - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) - : Futures.immediateFuture(null)); + return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) + : Futures.immediateFuture(null)); } else if (relationsQuery.getDirection() == EntitySearchDirection.TO) { - return Futures.transform(asyncRelation, (AsyncFunction, EntityId>) - r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) - : Futures.immediateFuture(null)); + return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) + : Futures.immediateFuture(null)); } return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 3d5c64ef1d..a681d68c01 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -51,7 +51,7 @@ public class EntitiesTenantIdAsyncLoader { } private static ListenableFuture getTenantAsync(ListenableFuture future) { - return Futures.transform(future, (AsyncFunction) in -> { + return Futures.transformAsync(future, in -> { return in != null ? Futures.immediateFuture(in.getTenantId()) : Futures.immediateFuture(null);}); }