diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index bf06a3a8ac..54ac41172c 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import lombok.Getter; @@ -469,7 +470,7 @@ public class ActorSystemContext { public void onFailure(Throwable th) { log.error("Could not save debug Event for Node", th); } - }); + }, MoreExecutors.directExecutor()); } catch (IOException ex) { log.warn("Failed to persist rule node debug message", ex); } @@ -522,7 +523,7 @@ public class ActorSystemContext { public void onFailure(Throwable th) { log.error("Could not save debug Event for Rule Chain", th); } - }); + }, MoreExecutors.directExecutor()); } public static Exception toException(Throwable error) { diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index acf1a3161c..2752b53090 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -20,9 +20,9 @@ import com.datastax.driver.core.utils.UUIDs; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -292,7 +292,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { .build(); sendToTransport(responseMsg, sessionInfo); } - }); + }, MoreExecutors.directExecutor()); } private ListenableFuture>> getAttributesKvEntries(GetAttributeRequestMsg request) { diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index d147d27b40..23460506e9 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -18,6 +18,7 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; @@ -30,6 +31,7 @@ import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import org.thingsboard.server.common.data.ClaimRequest; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -44,7 +46,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.ClaimRequest; import org.thingsboard.server.dao.device.claim.ClaimResponse; import org.thingsboard.server.dao.device.claim.ClaimResult; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -425,11 +426,12 @@ public class DeviceController extends BaseController { deferredResult.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); } } + @Override public void onFailure(Throwable t) { deferredResult.setErrorResult(t); } - }); + }, MoreExecutors.directExecutor()); return deferredResult; } catch (Exception e) { throw handleException(e); @@ -466,7 +468,7 @@ public class DeviceController extends BaseController { public void onFailure(Throwable t) { deferredResult.setErrorResult(t); } - }); + }, MoreExecutors.directExecutor()); return deferredResult; } catch (Exception e) { throw handleException(e); diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java index 2f146e5738..82053652d5 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java @@ -18,6 +18,7 @@ package org.thingsboard.server.controller; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.security.access.prepost.PreAuthorize; @@ -158,7 +159,7 @@ public class EntityViewController extends BaseController { }); } return null; - }); + }, MoreExecutors.directExecutor()); } else { return Futures.immediateFuture(null); } diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index e534ebf7c4..f607d57cf6 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; import com.google.gson.JsonParser; @@ -174,7 +175,7 @@ public class TelemetryController extends BaseController { public DeferredResult getTimeseriesKeys( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, - (result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result))); + (result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor())); } @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @@ -210,7 +211,7 @@ public class TelemetryController extends BaseController { List queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg)) .collect(Collectors.toList()); - Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes)); + Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); }); } @@ -462,7 +463,7 @@ public class TelemetryController extends BaseController { } else { future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys)); } - Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes)); + Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); } private void getAttributeValuesCallback(@Nullable DeferredResult result, SecurityUser user, EntityId entityId, String scope, String keys) { @@ -470,9 +471,9 @@ public class TelemetryController extends BaseController { FutureCallback> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList); if (!StringUtils.isEmpty(scope)) { if (keyList != null && !keyList.isEmpty()) { - Futures.addCallback(attributesService.find(user.getTenantId(), entityId, scope, keyList), callback); + Futures.addCallback(attributesService.find(user.getTenantId(), entityId, scope, keyList), callback, MoreExecutors.directExecutor()); } else { - Futures.addCallback(attributesService.findAll(user.getTenantId(), entityId, scope), callback); + Futures.addCallback(attributesService.findAll(user.getTenantId(), entityId, scope), callback, MoreExecutors.directExecutor()); } } else { List>> futures = new ArrayList<>(); @@ -486,12 +487,12 @@ public class TelemetryController extends BaseController { ListenableFuture> future = mergeAllAttributesFutures(futures); - Futures.addCallback(future, callback); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } } private void getAttributeKeysCallback(@Nullable DeferredResult result, TenantId tenantId, EntityId entityId, String scope) { - Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), getAttributeKeysToResponseCallback(result)); + Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), getAttributeKeysToResponseCallback(result), MoreExecutors.directExecutor()); } private void getAttributeKeysCallback(@Nullable DeferredResult result, TenantId tenantId, EntityId entityId) { @@ -502,7 +503,7 @@ public class TelemetryController extends BaseController { ListenableFuture> future = mergeAllAttributesFutures(futures); - Futures.addCallback(future, getAttributeKeysToResponseCallback(result)); + Futures.addCallback(future, getAttributeKeysToResponseCallback(result), MoreExecutors.directExecutor()); } private FutureCallback> getTsKeysToResponseCallback(final DeferredResult response) { diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java index e4901785fe..49a6304ebb 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractNashornJsInvokeService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import delight.nashornsandbox.NashornSandbox; import delight.nashornsandbox.NashornSandboxes; import jdk.nashorn.api.scripting.NashornScriptEngineFactory; @@ -28,20 +29,17 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.script.Invocable; import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @@ -140,7 +138,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer if (maxRequestsTimeout > 0) { result = Futures.withTimeout(result, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } - Futures.addCallback(result, evalCallback); + Futures.addCallback(result, evalCallback, MoreExecutors.directExecutor()); return result; } @@ -163,7 +161,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer if (maxRequestsTimeout > 0) { result = Futures.withTimeout(result, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService); } - Futures.addCallback(result, invokeCallback); + Futures.addCallback(result, invokeCallback, MoreExecutors.directExecutor()); return result; } diff --git a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java index 45d773f387..1a218b119b 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RemoteJsInvokeService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -40,7 +41,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; @Slf4j @ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true) @@ -166,7 +166,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } kafkaFailedMsgs.incrementAndGet(); } - }); + }, MoreExecutors.directExecutor()); return Futures.transform(future, response -> { JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse(); UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB()); @@ -178,7 +178,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails()); throw new RuntimeException(compilationResult.getErrorDetails()); } - }); + }, MoreExecutors.directExecutor()); } @Override @@ -217,7 +217,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { } kafkaFailedMsgs.incrementAndGet(); } - }); + }, MoreExecutors.directExecutor()); return Futures.transform(future, response -> { JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse(); if (invokeResult.getSuccess()) { @@ -226,7 +226,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService { log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails()); throw new RuntimeException(invokeResult.getErrorDetails()); } - }); + }, MoreExecutors.directExecutor()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 73bdd27b78..ef5d4716cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.common.data.id.EntityId; @@ -121,7 +122,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } else { return Futures.immediateFuture(unbindMsg(json, msg)); } - }); + }, MoreExecutors.directExecutor()); } @Override @@ -174,7 +175,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S } else { return Futures.immediateFuture(json.asBoolean()); } - }); + }, MoreExecutors.directExecutor()); } @Override @@ -232,7 +233,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S return Futures.immediateFailedFuture(new ScriptException(e)); } } - }); + }, MoreExecutors.directExecutor()); } public void destroy() { diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index f40de180ab..18726e3a5a 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -64,14 +64,26 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.thingsboard.server.common.data.DataConstants.*; +import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT; +import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT; +import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT; +import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; /** * Created by ashvayka on 01.05.18. @@ -401,7 +413,7 @@ public class DefaultDeviceStateService implements DeviceStateService { public void onFailure(Throwable t) { log.warn("Failed to register device to the state service", t); } - }); + }, MoreExecutors.directExecutor()); } else { sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false); } @@ -456,10 +468,10 @@ public class DefaultDeviceStateService implements DeviceStateService { private ListenableFuture fetchDeviceState(Device device) { if (persistToTelemetry) { ListenableFuture> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES); - return Futures.transform(tsData, extractDeviceStateData(device)); + return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor()); } else { ListenableFuture> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES); - return Futures.transform(attrData, extractDeviceStateData(device)); + return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index c4fe716ad9..741efe6f8a 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -54,9 +55,6 @@ import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd; import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd; import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd; -import org.thingsboard.server.service.telemetry.exception.AccessDeniedException; -import org.thingsboard.server.service.telemetry.exception.EntityNotFoundException; -import org.thingsboard.server.service.telemetry.exception.InternalErrorException; import org.thingsboard.server.service.telemetry.exception.UnauthorizedException; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; import org.thingsboard.server.service.telemetry.sub.SubscriptionState; @@ -70,12 +68,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -616,7 +616,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } ListenableFuture> future = mergeAllAttributesFutures(futures); - Futures.addCallback(future, callback); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } @Override @@ -630,7 +630,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi return new FutureCallback() { @Override public void onSuccess(@Nullable ValidationResult result) { - Futures.addCallback(attributesService.find(tenantId, entityId, scope, keys), callback); + Futures.addCallback(attributesService.find(tenantId, entityId, scope, keys), callback, MoreExecutors.directExecutor()); } @Override @@ -650,7 +650,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } ListenableFuture> future = mergeAllAttributesFutures(futures); - Futures.addCallback(future, callback); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } @Override @@ -664,7 +664,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi return new FutureCallback() { @Override public void onSuccess(@Nullable ValidationResult result) { - Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), callback); + Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), callback, MoreExecutors.directExecutor()); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java index 37ff8ed93d..8e820fbffe 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java @@ -19,10 +19,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.Device; @@ -42,19 +41,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; -import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; -import org.thingsboard.server.kafka.TBKafkaProducerTemplate; -import org.thingsboard.server.kafka.TbKafkaResponseTemplate; -import org.thingsboard.server.kafka.TbKafkaSettings; -import org.thingsboard.server.service.cluster.discovery.DiscoveryService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DeviceStateService; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; /** @@ -145,7 +135,7 @@ public class LocalTransportApiService implements TransportApiService { try { ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder(); builder.setDeviceInfo(getDeviceInfoProto(device)); - if(!StringUtils.isEmpty(credentials.getCredentialsValue())){ + if (!StringUtils.isEmpty(credentials.getCredentialsValue())) { builder.setCredentialsBody(credentials.getCredentialsValue()); } return TransportApiResponseMsg.newBuilder() @@ -154,7 +144,7 @@ public class LocalTransportApiService implements TransportApiService { log.warn("[{}] Failed to lookup device by id", deviceId, e); return getEmptyTransportApiResponse(); } - }); + }, MoreExecutors.directExecutor()); } private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException { diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java index 17599bfccb..a01411e48f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/AsyncCallbackTemplate.java @@ -18,6 +18,7 @@ package org.thingsboard.server.kafka; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -59,7 +60,7 @@ public class AsyncCallbackTemplate { if (executor != null) { Futures.addCallback(future, callback, executor); } else { - Futures.addCallback(future, callback); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } } diff --git a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java index 3557fcb40a..0940878ab2 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java +++ b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java @@ -18,19 +18,20 @@ package org.thingsboard.common.util; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.Executor; import java.util.function.Consumer; public class DonAsynchron { - public static void withCallback(ListenableFuture future, Consumer onSuccess, - Consumer onFailure) { + public static void withCallback(ListenableFuture future, Consumer onSuccess, + Consumer onFailure) { withCallback(future, onSuccess, onFailure, null); } - public static void withCallback(ListenableFuture future, Consumer onSuccess, - Consumer onFailure, Executor executor) { + public static void withCallback(ListenableFuture future, Consumer onSuccess, + Consumer onFailure, Executor executor) { FutureCallback callback = new FutureCallback() { @Override public void onSuccess(T result) { @@ -49,7 +50,7 @@ public class DonAsynchron { if (executor != null) { Futures.addCallback(future, callback, executor); } else { - Futures.addCallback(future, callback); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index d86e7e0f1b..453f1ea0be 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -53,7 +54,6 @@ import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -264,9 +264,8 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ entityService.fetchEntityNameAsync(tenantId, alarmInfo.getOriginator()), originatorName -> { alarmInfo.setOriginatorName(originatorName); return alarmInfo; - } - ); - }); + }, MoreExecutors.directExecutor()); + }, MoreExecutors.directExecutor()); } @Override @@ -283,11 +282,11 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ } alarmInfo.setOriginatorName(originatorName); return alarmInfo; - } + }, MoreExecutors.directExecutor() )); } return Futures.successfulAsList(alarmFutures); - }); + }, MoreExecutors.directExecutor()); } return Futures.transform(alarms, new Function, TimePageData>() { @Nullable @@ -295,7 +294,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ public TimePageData apply(@Nullable List alarms) { return new TimePageData<>(alarms, query.getPageLink()); } - }); + }, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java index f76ab871a4..e124b3e172 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/CassandraAlarmDao.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -82,10 +83,10 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao - assetList == null ? Collections.emptyList() : assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType())).collect(Collectors.toList()) + assetList == null ? Collections.emptyList() : assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType())).collect(Collectors.toList()), MoreExecutors.directExecutor() ); return assets; } @@ -274,7 +276,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ assetTypes -> { assetTypes.sort(Comparator.comparing(EntitySubtype::getType)); return assetTypes; - }); + }, MoreExecutors.directExecutor()); } private DataValidator assetValidator = @@ -335,18 +337,18 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ }; private PaginatedRemover tenantAssetsRemover = - new PaginatedRemover() { + new PaginatedRemover() { - @Override - protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { - return assetDao.findAssetsByTenantId(id.getId(), pageLink); - } + @Override + protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { + return assetDao.findAssetsByTenantId(id.getId(), pageLink); + } - @Override - protected void removeEntity(TenantId tenantId, Asset entity) { - deleteAsset(tenantId, new AssetId(entity.getId().getId())); - } - }; + @Override + protected void removeEntity(TenantId tenantId, Asset entity) { + deleteAsset(tenantId, new AssetId(entity.getId().getId())); + } + }; private PaginatedRemover customerAssetsUnasigner = new PaginatedRemover() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java index 9f3bac983e..9808e7b117 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/CassandraAssetDao.java @@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntitySubtype; @@ -185,7 +186,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao apply(@Nullable List dashboards) { return new TimePageData<>(dashboards, pageLink); } - }); + }, MoreExecutors.directExecutor()); } @Override @@ -244,24 +245,24 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb } } } - }; - + }; + private PaginatedRemover tenantDashboardsRemover = new PaginatedRemover() { - - @Override - protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { - return dashboardInfoDao.findDashboardsByTenantId(id.getId(), pageLink); - } - @Override - protected void removeEntity(TenantId tenantId, DashboardInfo entity) { - deleteDashboard(tenantId, new DashboardId(entity.getUuidId())); - } - }; - + @Override + protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { + return dashboardInfoDao.findDashboardsByTenantId(id.getId(), pageLink); + } + + @Override + protected void removeEntity(TenantId tenantId, DashboardInfo entity) { + deleteDashboard(tenantId, new DashboardId(entity.getUuidId())); + } + }; + private class CustomerDashboardsUnassigner extends TimePaginatedRemover { - + private Customer customer; CustomerDashboardsUnassigner(Customer customer) { @@ -282,7 +283,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb protected void removeEntity(TenantId tenantId, DashboardInfo entity) { unassignDashboardFromCustomer(customer.getTenantId(), new DashboardId(entity.getUuidId()), this.customer.getId()); } - + } private class CustomerDashboardsUpdater extends TimePaginatedRemover { diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java index e7becfa1ad..a01725fe52 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/CassandraDeviceDao.java @@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.Device; @@ -178,14 +179,14 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao entitySubtypes = new ArrayList<>(); result.all().forEach((entitySubtypeEntity) -> - entitySubtypes.add(entitySubtypeEntity.toEntitySubtype()) + entitySubtypes.add(entitySubtypeEntity.toEntitySubtype()) ); return entitySubtypes; } else { return Collections.emptyList(); } } - }); + }, MoreExecutors.directExecutor()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java index abd453cc05..0bfc8885be 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/ClaimDevicesServiceImpl.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -97,9 +98,9 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService { } log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName()); throw new IllegalArgumentException(); - }); + }, MoreExecutors.directExecutor()); } - }); + }, MoreExecutors.directExecutor()); } private ClaimDataInfo getClaimData(Cache cache, Device device) throws ExecutionException, InterruptedException { @@ -138,9 +139,9 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService { if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) { device.setCustomerId(customerId); Device savedDevice = deviceService.saveDevice(device); - return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(savedDevice, ClaimResponse.SUCCESS)); + return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(savedDevice, ClaimResponse.SUCCESS), MoreExecutors.directExecutor()); } - return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(null, ClaimResponse.CLAIMED)); + return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(null, ClaimResponse.CLAIMED), MoreExecutors.directExecutor()); } } else { log.warn("Failed to find the device's claiming message![{}]", device.getName()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index dbcbac73b8..db998b7328 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.hibernate.exception.ConstraintViolationException; @@ -291,7 +292,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe } } return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); devices = Futures.transform(devices, new Function, List>() { @Nullable @@ -299,7 +300,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe public List apply(@Nullable List deviceList) { return deviceList == null ? Collections.emptyList() : deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList()); } - }); + }, MoreExecutors.directExecutor()); return devices; } @@ -313,7 +314,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe deviceTypes -> { deviceTypes.sort(Comparator.comparing(EntitySubtype::getType)); return deviceTypes; - }); + }, MoreExecutors.directExecutor()); } private DataValidator deviceValidator = @@ -374,18 +375,18 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe }; private PaginatedRemover tenantDevicesRemover = - new PaginatedRemover() { + new PaginatedRemover() { - @Override - protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { - return deviceDao.findDevicesByTenantId(id.getId(), pageLink); - } + @Override + protected List findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) { + return deviceDao.findDevicesByTenantId(id.getId(), pageLink); + } - @Override - protected void removeEntity(TenantId tenantId, Device entity) { - deleteDevice(tenantId, new DeviceId(entity.getUuidId())); - } - }; + @Override + protected void removeEntity(TenantId tenantId, Device entity) { + deleteDevice(tenantId, new DeviceId(entity.getUuidId())); + } + }; private PaginatedRemover customerDeviceUnasigner = new PaginatedRemover() { diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index c49fcc3728..5867e505b6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -18,12 +18,21 @@ package org.thingsboard.server.dao.entity; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.alarm.AlarmId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.customer.CustomerService; @@ -109,7 +118,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe default: throw new IllegalStateException("Not Implemented!"); } - entityName = Futures.transform(hasName, (Function) hasName1 -> hasName1 != null ? hasName1.getName() : null ); + entityName = Futures.transform(hasName, (Function) hasName1 -> hasName1 != null ? hasName1.getName() : null, MoreExecutors.directExecutor()); return entityName; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java b/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java index aabc2c172a..05fac418d2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entityview/CassandraEntityViewDao.java @@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntitySubtype; @@ -97,7 +98,7 @@ public class CassandraEntityViewDao extends CassandraAbstractSearchTextDao entityViewEntities = findPageWithTextSearch(new TenantId(tenantId), ENTITY_VIEW_BY_TENANT_AND_SEARCH_TEXT_CF, - Collections.singletonList(eq(TENANT_ID_PROPERTY, tenantId)), pageLink); + Collections.singletonList(eq(TENANT_ID_PROPERTY, tenantId)), pageLink); log.trace("Found entity views [{}] by tenantId [{}] and pageLink [{}]", entityViewEntities, tenantId, pageLink); return DaoUtil.convertDataList(entityViewEntities); @@ -181,6 +182,6 @@ public class CassandraEntityViewDao extends CassandraAbstractSearchTextDao, List>() { @Nullable @@ -207,7 +208,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti public List apply(@Nullable List entityViewList) { return entityViewList == null ? Collections.emptyList() : entityViewList.stream().filter(entityView -> query.getEntityViewTypes().contains(entityView.getType())).collect(Collectors.toList()); } - }); + }, MoreExecutors.directExecutor()); return entityViews; } @@ -246,7 +247,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti public void onFailure(Throwable t) { log.error("Error while finding entity views by tenantId and entityId", t); } - }); + }, MoreExecutors.directExecutor()); return entityViewsFuture; } } @@ -279,7 +280,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti entityViewTypes -> { entityViewTypes.sort(Comparator.comparing(EntitySubtype::getType)); return entityViewTypes; - }); + }, MoreExecutors.directExecutor()); } private DataValidator entityViewValidator = diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java index 1c93a5df07..bdd0201aa5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.utils.UUIDs; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; @@ -45,10 +46,12 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; -import static com.datastax.driver.core.querybuilder.QueryBuilder.in; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl; -import static org.thingsboard.server.dao.model.ModelConstants.*; +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_ID_VIEW_NAME; +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_TYPE_AND_ID_VIEW_NAME; +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME; +import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; @Component @Slf4j @@ -96,7 +99,7 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao> optionalSave = saveAsync(event.getTenantId(), new EventEntity(event), false, eventsTtl); - return Futures.transform(optionalSave, opt -> opt.orElse(null)); + return Futures.transform(optionalSave, opt -> opt.orElse(null), MoreExecutors.directExecutor()); } @Override @@ -210,6 +213,6 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao, D> exte return Collections.emptyList(); } } - }); + }, MoreExecutors.directExecutor()); } return Futures.immediateFuture(Collections.emptyList()); } @@ -120,7 +121,7 @@ public abstract class CassandraAbstractModelDao, D> exte return null; } } - }); + }, MoreExecutors.directExecutor()); } return Futures.immediateFuture(null); } @@ -191,5 +192,5 @@ public abstract class CassandraAbstractModelDao, D> exte List entities = findListByStatement(tenantId, QueryBuilder.select().all().from(getColumnFamilyName()).setConsistencyLevel(cluster.getDefaultReadConsistencyLevel())); return DaoUtil.convertDataList(entities); } - + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java index cb30a7f48b..ebbe451b01 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java +++ b/dao/src/main/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFuture.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.Statement; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import org.thingsboard.server.dao.exception.BufferLimitException; import org.thingsboard.server.dao.util.AsyncRateLimiter; @@ -44,9 +45,9 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { rateLimiter.release(); } return Futures.immediateFailedFuture(t); - }); + }, MoreExecutors.directExecutor()); this.originalFuture = Futures.transform(rateLimitFuture, - i -> executeAsyncWithRelease(rateLimiter, session, statement)); + i -> executeAsyncWithRelease(rateLimiter, session, statement), MoreExecutors.directExecutor()); } @@ -145,7 +146,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture { public void onFailure(Throwable t) { rateLimiter.release(); } - }); + }, MoreExecutors.directExecutor()); return resultSetFuture; } catch (RuntimeException re) { rateLimiter.release(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 68b7fce6a8..22b2543489 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -16,7 +16,10 @@ package org.thingsboard.server.dao.relation; import com.google.common.base.Function; -import com.google.common.util.concurrent.*; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; @@ -206,17 +209,20 @@ public class BaseRelationService implements RelationService { relations -> { List> results = deleteRelationGroupsAsync(tenantId, relations, cache, true); return Futures.allAsList(results); - }); + }, MoreExecutors.directExecutor()); ListenableFuture> outboundDeletions = Futures.transformAsync(outboundRelations, relations -> { List> results = deleteRelationGroupsAsync(tenantId, relations, cache, false); return Futures.allAsList(results); - }); + }, MoreExecutors.directExecutor()); ListenableFuture>> deletionsFuture = Futures.allAsList(inboundDeletions, outboundDeletions); - return Futures.transform(Futures.transformAsync(deletionsFuture, (deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId)), result -> null); + return Futures.transform(Futures.transformAsync(deletionsFuture, + (deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId), + MoreExecutors.directExecutor()), + result -> null, MoreExecutors.directExecutor()); } private List> deleteRelationGroupsAsync(TenantId tenantId, List> relations, Cache cache, boolean deleteFromDb) { @@ -306,9 +312,11 @@ public class BaseRelationService implements RelationService { public void onSuccess(@Nullable List result) { cache.putIfAbsent(fromAndTypeGroup, result); } + @Override - public void onFailure(Throwable t) {} - }); + public void onFailure(Throwable t) { + } + }, MoreExecutors.directExecutor()); return relationsFuture; } } @@ -328,7 +336,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo::setToName)) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}") @@ -385,9 +393,11 @@ public class BaseRelationService implements RelationService { public void onSuccess(@Nullable List result) { cache.putIfAbsent(toAndTypeGroup, result); } + @Override - public void onFailure(Throwable t) {} - }); + public void onFailure(Throwable t) { + } + }, MoreExecutors.directExecutor()); return relationsFuture; } } @@ -407,7 +417,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo::setFromName)) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } private ListenableFuture fetchRelationInfoAsync(TenantId tenantId, EntityRelation relation, @@ -418,7 +428,7 @@ public class BaseRelationService implements RelationService { EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation); entityNameSetter.accept(entityRelationInfo1, entityName1); return entityRelationInfo1; - }); + }, MoreExecutors.directExecutor()); } @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}") @@ -466,7 +476,7 @@ public class BaseRelationService implements RelationService { } } return relations; - }); + }, MoreExecutors.directExecutor()); } catch (Exception e) { log.warn("Failed to query relations: [{}]", query, e); throw new RuntimeException(e); @@ -493,7 +503,7 @@ public class BaseRelationService implements RelationService { })) ); return Futures.successfulAsList(futures); - }); + }, MoreExecutors.directExecutor()); } protected void validate(EntityRelation relation) { @@ -600,7 +610,7 @@ public class BaseRelationService implements RelationService { } //TODO: try to remove this blocking operation List> relations = Futures.successfulAsList(futures).get(); - if (fetchLastLevelOnly && lvl > 0){ + if (fetchLastLevelOnly && lvl > 0) { children.clear(); } relations.forEach(r -> r.forEach(children::add)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java index e00d73444b..356781d297 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/alarm/JpaAlarmDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.alarm; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -108,9 +109,9 @@ public class JpaAlarmDao extends JpaAbstractDao implements A for (EntityRelation relation : input) { alarmFutures.add(Futures.transform( findAlarmByIdAsync(tenantId, relation.getTo().getId()), - AlarmInfo::new)); + AlarmInfo::new, MoreExecutors.directExecutor())); } return Futures.successfulAsList(alarmFutures); - }); + }, MoreExecutors.directExecutor()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java index 64fe4af86b..aa80e8b5e4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/dashboard/JpaDashboardInfoDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.dashboard; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; @@ -91,6 +92,6 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao> entitiesFutures) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java index 16828ee8e2..a9277ec7e2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/AbstractSqlTimeseriesDao.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; @@ -235,7 +236,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx public void onFailure(Throwable t) { log.warn("[{}] Failed to process remove of the latest value", entityId, t); } - }); + }, MoreExecutors.directExecutor()); return resultFuture; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java index 176bc712e8..4ca53a337b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/timescale/TimescaleTimeseriesDao.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -143,7 +144,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements } else { return Collections.emptyList(); } - }); + }, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java index 8fc8b4ab8a..b96462e350 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesDao.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -330,7 +331,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem stmt.setInt(6, (int) ttl); } futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null)); - return Futures.transform(Futures.allAsList(futures), result -> null); + return Futures.transform(Futures.allAsList(futures), result -> null, MoreExecutors.directExecutor()); } private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List> futures, long partition, DataType type) { @@ -545,7 +546,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem public void onFailure(Throwable t) { log.warn("[{}] Failed to process remove of the latest value", entityId, t); } - }); + }, MoreExecutors.directExecutor()); return resultFuture; } diff --git a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java index 76847c0e80..bb4a08a8f1 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/nosql/RateLimitedResultSetFutureTest.java @@ -119,7 +119,7 @@ public class RateLimitedResultSetFutureTest { resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); - ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor()); Row actualRow = transform.get(); assertSame(row, actualRow); @@ -132,7 +132,7 @@ public class RateLimitedResultSetFutureTest { when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null)); when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg")); resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); - ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor()); try { transform.get(); fail(); @@ -156,7 +156,7 @@ public class RateLimitedResultSetFutureTest { when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout"))); resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); - ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor()); try { transform.get(); fail(); @@ -177,7 +177,7 @@ public class RateLimitedResultSetFutureTest { when(rateLimiter.acquireAsync()).thenReturn(future); resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement); - ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one); + ListenableFuture transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor()); // TimeUnit.MILLISECONDS.sleep(200); future.cancel(false); latch.countDown(); diff --git a/pom.xml b/pom.xml index 547c97448f..ab7e658abf 100755 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ 3.6.0 3.5.0.1 1.2.7 - 21.0 + 28.2-jre 2.6.1 3.4 1.6 @@ -63,7 +63,7 @@ 1.4.3 4.2.0 3.5.5 - 3.6.1 + 3.11.4 1.22.1 1.16.18 1.1.0 diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java index bcfabf1100..3d2cb5d8c8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractRelationActionNode.java @@ -20,6 +20,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -54,9 +55,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.thingsboard.common.util.DonAsynchron.withCallback; import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; -import static org.thingsboard.common.util.DonAsynchron.withCallback; @Slf4j public abstract class TbAbstractRelationActionNode implements TbNode { @@ -86,7 +87,7 @@ public abstract class TbAbstractRelationActionNode processEntityRelationAction(TbContext ctx, TbMsg msg) { - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer)); + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor()); } protected abstract boolean createEntityIfNotExists(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java index d787d6b7ab..63fb59bed5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbClearAlarmNode.java @@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.action; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmStatus; import org.thingsboard.server.common.data.plugin.ComponentType; @@ -80,8 +81,8 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) { ListenableFuture asyncAlarm; if (msgAlarm != null) { - asyncAlarm = Futures.immediateCheckedFuture(msgAlarm); + asyncAlarm = Futures.immediateFuture(msgAlarm); } else { ctx.logJsEvalRequest(); asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null), details -> { ctx.logJsEvalResponse(); return buildAlarm(msg, details, ctx.getTenantId()); - }); + }, MoreExecutors.directExecutor()); } ListenableFuture asyncCreated = Futures.transform(asyncAlarm, alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor()); - return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm)); + return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm), MoreExecutors.directExecutor()); } private ListenableFuture updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) { @@ -140,7 +141,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode new AlarmResult(false, true, false, a)); + return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a), MoreExecutors.directExecutor()); } private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java index ee6966f588..de74551c22 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCreateRelationNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -81,7 +82,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer) { @@ -120,7 +121,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode false); + return Futures.transform(Futures.allAsList(list), result -> false, MoreExecutors.directExecutor()); } return Futures.immediateFuture(false); }, ctx.getDbCallbackExecutor()); @@ -161,7 +162,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeleteRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeleteRelationNode.java index 671829f63f..9af3708fcd 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeleteRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeleteRelationNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -66,17 +67,18 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) { - return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result)); + return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); } private ListenableFuture getRelationContainerListenableFuture(TbContext ctx, TbMsg msg) { relationType = processPattern(msg, config.getRelationType()); if (config.isDeleteForSingleEntity()) { - return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer)); + return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor()); } else { - return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result)); + return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor()); } } + private ListenableFuture processList(TbContext ctx, TbMsg msg) { return Futures.transformAsync(processListSearchDirection(ctx, msg), entityRelations -> { if (entityRelations.isEmpty()) { @@ -93,9 +95,9 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer) { @@ -106,7 +108,7 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java index e0ec9300d4..086e3aded0 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckAlarmStatusNode.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -27,17 +28,12 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.alarm.AlarmId; import org.thingsboard.server.common.data.alarm.AlarmStatus; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import javax.annotation.Nullable; import java.io.IOException; -import java.util.UUID; - -import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; -import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; @Slf4j @RuleNode( @@ -91,7 +87,7 @@ public class TbCheckAlarmStatusNode implements TbNode { public void onFailure(Throwable t) { ctx.tellFailure(msg, t); } - }); + }, MoreExecutors.directExecutor()); } catch (IOException e) { log.error("Failed to parse alarm: [{}]", msg.getData()); throw new TbNodeException(e); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java index 8fb64bbadf..89cd986b7e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbCheckRelationNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.filter; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -87,10 +88,10 @@ public class TbCheckRelationNode implements TbNode { private ListenableFuture processList(TbContext ctx, TbMsg msg) { if (EntitySearchDirection.FROM.name().equals(config.getDirection())) { return Futures.transformAsync(ctx.getRelationService() - .findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList); + .findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList, MoreExecutors.directExecutor()); } else { return Futures.transformAsync(ctx.getRelationService() - .findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList); + .findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList, MoreExecutors.directExecutor()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index be4e6b4e30..0bf1c23cc7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParseException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.BooleanUtils; @@ -122,7 +123,7 @@ public abstract class TbAbstractGetAttributesNode putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List keys, ConcurrentHashMap> failuresMap) { @@ -152,7 +153,7 @@ public abstract class TbAbstractGetAttributesNode implements TbNode { private static final Gson gson = new Gson(); private static final JsonParser jsonParser = new JsonParser(); - private static final Type TYPE = new TypeToken>() {}.getType(); + private static final Type TYPE = new TypeToken>() { + }.getType(); protected C config; @@ -104,7 +106,7 @@ public abstract class TbAbstractGetEntityDetailsNode addContactProperties(JsonElement data, ListenableFuture entityFuture, EntityDetails entityDetails, String prefix) { @@ -114,7 +116,7 @@ public abstract class TbAbstractGetEntityDetailsNode implements TbNode } private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) { - if(entityId == null || entityId.isNullUid()) { + if (entityId == null || entityId.isNullUid()) { ctx.tellNext(msg, FAILURE); return; } @@ -73,13 +74,13 @@ public abstract class TbEntityGetAttrNode implements TbNode private ListenableFuture> getAttributesAsync(TbContext ctx, EntityId entityId) { ListenableFuture> latest = ctx.getAttributesService().find(ctx.getTenantId(), entityId, SERVER_SCOPE, config.getAttrMapping().keySet()); return Futures.transform(latest, l -> - l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor()); } private ListenableFuture> getLatestTelemetry(TbContext ctx, EntityId entityId) { ListenableFuture> latest = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, config.getAttrMapping().keySet()); return Futures.transform(latest, l -> - l.stream().map(i -> (KvEntry) i).collect(Collectors.toList())); + l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor()); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNode.java index 861e7330c5..f185d02965 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbGetCustomerDetailsNode.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; @@ -63,7 +64,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode getCustomer(TbContext ctx, TbMsg msg) { @@ -79,7 +80,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode { if (asset != null) { @@ -91,7 +92,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode { if (entityView != null) { @@ -103,7 +104,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode { return in != null ? Futures.immediateFuture(in.getOriginator()) : Futures.immediateFuture(null); - }); + }, MoreExecutors.directExecutor()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java index ae1b54fffd..602ea8452b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesCustomerIdAsyncLoader.java @@ -15,13 +15,17 @@ */ package org.thingsboard.rule.engine.util; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.HasCustomerId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.UserId; public class EntitiesCustomerIdAsyncLoader { @@ -44,6 +48,6 @@ public class EntitiesCustomerIdAsyncLoader { private static ListenableFuture getCustomerAsync(ListenableFuture future) { return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId()) - : Futures.immediateFuture(null)); + : Futures.immediateFuture(null), MoreExecutors.directExecutor()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java index 74d586e1a7..a0a1c8629f 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesFieldsAsyncLoader.java @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.util; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.BaseData; @@ -66,6 +67,6 @@ public class EntitiesFieldsAsyncLoader { ListenableFuture future, Function converter) { return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(converter.apply(in)) - : Futures.immediateFailedFuture(new RuntimeException("Entity not found!"))); + : Futures.immediateFailedFuture(new RuntimeException("Entity not found!")), MoreExecutors.directExecutor()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java index b264bede07..e06113df8e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedDeviceIdAsyncLoader.java @@ -15,9 +15,9 @@ */ package org.thingsboard.rule.engine.util; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.data.DeviceRelationsQuery; @@ -40,7 +40,7 @@ public class EntitiesRelatedDeviceIdAsyncLoader { ListenableFuture> asyncDevices = deviceService.findDevicesByQuery(ctx.getTenantId(), query); return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId()) - : Futures.immediateFuture(null)); + : Futures.immediateFuture(null), MoreExecutors.directExecutor()); } private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java index 39b2817761..a478b6b903 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesRelatedEntityIdAsyncLoader.java @@ -15,9 +15,9 @@ */ package org.thingsboard.rule.engine.util; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.data.RelationsQuery; @@ -39,10 +39,10 @@ public class EntitiesRelatedEntityIdAsyncLoader { ListenableFuture> asyncRelation = relationService.findByQuery(ctx.getTenantId(), query); if (relationsQuery.getDirection() == EntitySearchDirection.FROM) { return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo()) - : Futures.immediateFuture(null)); + : Futures.immediateFuture(null), MoreExecutors.directExecutor()); } else if (relationsQuery.getDirection() == EntitySearchDirection.TO) { return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom()) - : Futures.immediateFuture(null)); + : Futures.immediateFuture(null), MoreExecutors.directExecutor()); } return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction")); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java index 1a2ff9a1c1..3ff25e1e8b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/EntitiesTenantIdAsyncLoader.java @@ -15,14 +15,20 @@ */ package org.thingsboard.rule.engine.util; -import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.alarm.AlarmId; -import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; public class EntitiesTenantIdAsyncLoader { @@ -51,6 +57,7 @@ public class EntitiesTenantIdAsyncLoader { private static ListenableFuture getTenantAsync(ListenableFuture future) { return Futures.transformAsync(future, in -> { return in != null ? Futures.immediateFuture(in.getTenantId()) - : Futures.immediateFuture(null);}); + : Futures.immediateFuture(null); + }, MoreExecutors.directExecutor()); } }