updated guava and protobuf versions

This commit is contained in:
YevhenBondarenko 2020-03-10 14:48:12 +02:00 committed by Andrew Shvayka
parent f3a01f21f6
commit d68ef2333a
56 changed files with 308 additions and 233 deletions

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
@ -469,7 +470,7 @@ public class ActorSystemContext {
public void onFailure(Throwable th) {
log.error("Could not save debug Event for Node", th);
}
});
}, MoreExecutors.directExecutor());
} catch (IOException ex) {
log.warn("Failed to persist rule node debug message", ex);
}
@ -522,7 +523,7 @@ public class ActorSystemContext {
public void onFailure(Throwable th) {
log.error("Could not save debug Event for Rule Chain", th);
}
});
}, MoreExecutors.directExecutor());
}
public static Exception toException(Throwable error) {

View File

@ -20,9 +20,9 @@ import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@ -292,7 +292,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
.build();
sendToTransport(responseMsg, sessionInfo);
}
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<List<List<AttributeKvEntry>>> getAttributesKvEntries(GetAttributeRequestMsg request) {

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
@ -30,6 +31,7 @@ import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.ClaimRequest;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@ -44,7 +46,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.ClaimRequest;
import org.thingsboard.server.dao.device.claim.ClaimResponse;
import org.thingsboard.server.dao.device.claim.ClaimResult;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@ -425,11 +426,12 @@ public class DeviceController extends BaseController {
deferredResult.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}
}
@Override
public void onFailure(Throwable t) {
deferredResult.setErrorResult(t);
}
});
}, MoreExecutors.directExecutor());
return deferredResult;
} catch (Exception e) {
throw handleException(e);
@ -466,7 +468,7 @@ public class DeviceController extends BaseController {
public void onFailure(Throwable t) {
deferredResult.setErrorResult(t);
}
});
}, MoreExecutors.directExecutor());
return deferredResult;
} catch (Exception e) {
throw handleException(e);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.controller;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.security.access.prepost.PreAuthorize;
@ -158,7 +159,7 @@ public class EntityViewController extends BaseController {
});
}
return null;
});
}, MoreExecutors.directExecutor());
} else {
return Futures.immediateFuture(null);
}

View File

@ -22,6 +22,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
@ -174,7 +175,7 @@ public class TelemetryController extends BaseController {
public DeferredResult<ResponseEntity> getTimeseriesKeys(
@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException {
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
(result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result)));
(result, tenantId, entityId) -> Futures.addCallback(tsService.findAllLatest(tenantId, entityId), getTsKeysToResponseCallback(result), MoreExecutors.directExecutor()));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@ -210,7 +211,7 @@ public class TelemetryController extends BaseController {
List<ReadTsKvQuery> queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, interval, limit, agg))
.collect(Collectors.toList());
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes));
Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
});
}
@ -462,7 +463,7 @@ public class TelemetryController extends BaseController {
} else {
future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys));
}
Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes));
Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
}
private void getAttributeValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String scope, String keys) {
@ -470,9 +471,9 @@ public class TelemetryController extends BaseController {
FutureCallback<List<AttributeKvEntry>> callback = getAttributeValuesToResponseCallback(result, user, scope, entityId, keyList);
if (!StringUtils.isEmpty(scope)) {
if (keyList != null && !keyList.isEmpty()) {
Futures.addCallback(attributesService.find(user.getTenantId(), entityId, scope, keyList), callback);
Futures.addCallback(attributesService.find(user.getTenantId(), entityId, scope, keyList), callback, MoreExecutors.directExecutor());
} else {
Futures.addCallback(attributesService.findAll(user.getTenantId(), entityId, scope), callback);
Futures.addCallback(attributesService.findAll(user.getTenantId(), entityId, scope), callback, MoreExecutors.directExecutor());
}
} else {
List<ListenableFuture<List<AttributeKvEntry>>> futures = new ArrayList<>();
@ -486,12 +487,12 @@ public class TelemetryController extends BaseController {
ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
Futures.addCallback(future, callback);
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}
}
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, TenantId tenantId, EntityId entityId, String scope) {
Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), getAttributeKeysToResponseCallback(result));
Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), getAttributeKeysToResponseCallback(result), MoreExecutors.directExecutor());
}
private void getAttributeKeysCallback(@Nullable DeferredResult<ResponseEntity> result, TenantId tenantId, EntityId entityId) {
@ -502,7 +503,7 @@ public class TelemetryController extends BaseController {
ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
Futures.addCallback(future, getAttributeKeysToResponseCallback(result));
Futures.addCallback(future, getAttributeKeysToResponseCallback(result), MoreExecutors.directExecutor());
}
private FutureCallback<List<TsKvEntry>> getTsKeysToResponseCallback(final DeferredResult<ResponseEntity> response) {

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import delight.nashornsandbox.NashornSandbox;
import delight.nashornsandbox.NashornSandboxes;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
@ -28,20 +29,17 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ -140,7 +138,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
if (maxRequestsTimeout > 0) {
result = Futures.withTimeout(result, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
}
Futures.addCallback(result, evalCallback);
Futures.addCallback(result, evalCallback, MoreExecutors.directExecutor());
return result;
}
@ -163,7 +161,7 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
if (maxRequestsTimeout > 0) {
result = Futures.withTimeout(result, maxRequestsTimeout, TimeUnit.MILLISECONDS, timeoutExecutorService);
}
Futures.addCallback(result, invokeCallback);
Futures.addCallback(result, invokeCallback, MoreExecutors.directExecutor());
return result;
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.script;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -40,7 +41,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@ConditionalOnProperty(prefix = "js", value = "evaluator", havingValue = "remote", matchIfMissing = true)
@ -166,7 +166,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
}
kafkaFailedMsgs.incrementAndGet();
}
});
}, MoreExecutors.directExecutor());
return Futures.transform(future, response -> {
JsInvokeProtos.JsCompileResponse compilationResult = response.getCompileResponse();
UUID compiledScriptId = new UUID(compilationResult.getScriptIdMSB(), compilationResult.getScriptIdLSB());
@ -178,7 +178,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
log.debug("[{}] Failed to compile script due to [{}]: {}", compiledScriptId, compilationResult.getErrorCode().name(), compilationResult.getErrorDetails());
throw new RuntimeException(compilationResult.getErrorDetails());
}
});
}, MoreExecutors.directExecutor());
}
@Override
@ -217,7 +217,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
}
kafkaFailedMsgs.incrementAndGet();
}
});
}, MoreExecutors.directExecutor());
return Futures.transform(future, response -> {
JsInvokeProtos.JsInvokeResponse invokeResult = response.getInvokeResponse();
if (invokeResult.getSuccess()) {
@ -226,7 +226,7 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
log.debug("[{}] Failed to compile script due to [{}]: {}", scriptId, invokeResult.getErrorCode().name(), invokeResult.getErrorDetails());
throw new RuntimeException(invokeResult.getErrorDetails());
}
});
}, MoreExecutors.directExecutor());
}
@Override

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.common.data.id.EntityId;
@ -121,7 +122,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
} else {
return Futures.immediateFuture(unbindMsg(json, msg));
}
});
}, MoreExecutors.directExecutor());
}
@Override
@ -174,7 +175,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
} else {
return Futures.immediateFuture(json.asBoolean());
}
});
}, MoreExecutors.directExecutor());
}
@Override
@ -232,7 +233,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S
return Futures.immediateFailedFuture(new ScriptException(e));
}
}
});
}, MoreExecutors.directExecutor());
}
public void destroy() {

View File

@ -64,14 +64,26 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.common.data.DataConstants.*;
import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.DISCONNECT_EVENT;
import static org.thingsboard.server.common.data.DataConstants.INACTIVITY_EVENT;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
/**
* Created by ashvayka on 01.05.18.
@ -401,7 +413,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
public void onFailure(Throwable t) {
log.warn("Failed to register device to the state service", t);
}
});
}, MoreExecutors.directExecutor());
} else {
sendDeviceEvent(device.getTenantId(), device.getId(), address.get(), true, false, false);
}
@ -456,10 +468,10 @@ public class DefaultDeviceStateService implements DeviceStateService {
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
if (persistToTelemetry) {
ListenableFuture<List<TsKvEntry>> tsData = tsService.findLatest(TenantId.SYS_TENANT_ID, device.getId(), PERSISTENT_ATTRIBUTES);
return Futures.transform(tsData, extractDeviceStateData(device));
return Futures.transform(tsData, extractDeviceStateData(device), MoreExecutors.directExecutor());
} else {
ListenableFuture<List<AttributeKvEntry>> attrData = attributesService.find(TenantId.SYS_TENANT_ID, device.getId(), DataConstants.SERVER_SCOPE, PERSISTENT_ATTRIBUTES);
return Futures.transform(attrData, extractDeviceStateData(device));
return Futures.transform(attrData, extractDeviceStateData(device), MoreExecutors.directExecutor());
}
}

View File

@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -54,9 +55,6 @@ import org.thingsboard.server.service.telemetry.cmd.SubscriptionCmd;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmd;
import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper;
import org.thingsboard.server.service.telemetry.cmd.TimeseriesSubscriptionCmd;
import org.thingsboard.server.service.telemetry.exception.AccessDeniedException;
import org.thingsboard.server.service.telemetry.exception.EntityNotFoundException;
import org.thingsboard.server.service.telemetry.exception.InternalErrorException;
import org.thingsboard.server.service.telemetry.exception.UnauthorizedException;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.SubscriptionState;
@ -70,12 +68,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -616,7 +616,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
Futures.addCallback(future, callback);
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}
@Override
@ -630,7 +630,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
return new FutureCallback<ValidationResult>() {
@Override
public void onSuccess(@Nullable ValidationResult result) {
Futures.addCallback(attributesService.find(tenantId, entityId, scope, keys), callback);
Futures.addCallback(attributesService.find(tenantId, entityId, scope, keys), callback, MoreExecutors.directExecutor());
}
@Override
@ -650,7 +650,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
}
ListenableFuture<List<AttributeKvEntry>> future = mergeAllAttributesFutures(futures);
Futures.addCallback(future, callback);
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}
@Override
@ -664,7 +664,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi
return new FutureCallback<ValidationResult>() {
@Override
public void onSuccess(@Nullable ValidationResult result) {
Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), callback);
Futures.addCallback(attributesService.findAll(tenantId, entityId, scope), callback, MoreExecutors.directExecutor());
}
@Override

View File

@ -19,10 +19,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Device;
@ -42,19 +41,10 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponse
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaResponseTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DeviceStateService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -145,7 +135,7 @@ public class LocalTransportApiService implements TransportApiService {
try {
ValidateDeviceCredentialsResponseMsg.Builder builder = ValidateDeviceCredentialsResponseMsg.newBuilder();
builder.setDeviceInfo(getDeviceInfoProto(device));
if(!StringUtils.isEmpty(credentials.getCredentialsValue())){
if (!StringUtils.isEmpty(credentials.getCredentialsValue())) {
builder.setCredentialsBody(credentials.getCredentialsValue());
}
return TransportApiResponseMsg.newBuilder()
@ -154,7 +144,7 @@ public class LocalTransportApiService implements TransportApiService {
log.warn("[{}] Failed to lookup device by id", deviceId, e);
return getEmptyTransportApiResponse();
}
});
}, MoreExecutors.directExecutor());
}
private DeviceInfoProto getDeviceInfoProto(Device device) throws JsonProcessingException {

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.kafka;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -59,7 +60,7 @@ public class AsyncCallbackTemplate {
if (executor != null) {
Futures.addCallback(future, callback, executor);
} else {
Futures.addCallback(future, callback);
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}
}

View File

@ -18,19 +18,20 @@ package org.thingsboard.common.util;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
public class DonAsynchron {
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure) {
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure) {
withCallback(future, onSuccess, onFailure, null);
}
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure, Executor executor) {
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess,
Consumer<Throwable> onFailure, Executor executor) {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
@ -49,7 +50,7 @@ public class DonAsynchron {
if (executor != null) {
Futures.addCallback(future, callback, executor);
} else {
Futures.addCallback(future, callback);
Futures.addCallback(future, callback, MoreExecutors.directExecutor());
}
}
}

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@ -53,7 +54,6 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@ -264,9 +264,8 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
entityService.fetchEntityNameAsync(tenantId, alarmInfo.getOriginator()), originatorName -> {
alarmInfo.setOriginatorName(originatorName);
return alarmInfo;
}
);
});
}, MoreExecutors.directExecutor());
}, MoreExecutors.directExecutor());
}
@Override
@ -283,11 +282,11 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
}
alarmInfo.setOriginatorName(originatorName);
return alarmInfo;
}
}, MoreExecutors.directExecutor()
));
}
return Futures.successfulAsList(alarmFutures);
});
}, MoreExecutors.directExecutor());
}
return Futures.transform(alarms, new Function<List<AlarmInfo>, TimePageData<AlarmInfo>>() {
@Nullable
@ -295,7 +294,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
public TimePageData<AlarmInfo> apply(@Nullable List<AlarmInfo> alarms) {
return new TimePageData<>(alarms, query.getPageLink());
}
});
}, MoreExecutors.directExecutor());
}
@Override

View File

@ -20,6 +20,7 @@ import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -82,10 +83,10 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao<AlarmEntity, Al
@Override
public Boolean deleteAlarm(TenantId tenantId, Alarm alarm) {
Statement delete = QueryBuilder.delete().all().from(getColumnFamilyName()).where(eq(ModelConstants.ID_PROPERTY, alarm.getId().getId()))
.and(eq(ALARM_TENANT_ID_PROPERTY, tenantId.getId()))
.and(eq(ALARM_ORIGINATOR_ID_PROPERTY, alarm.getOriginator().getId()))
.and(eq(ALARM_ORIGINATOR_TYPE_PROPERTY, alarm.getOriginator().getEntityType()))
.and(eq(ALARM_TYPE_PROPERTY, alarm.getType()));
.and(eq(ALARM_TENANT_ID_PROPERTY, tenantId.getId()))
.and(eq(ALARM_ORIGINATOR_ID_PROPERTY, alarm.getOriginator().getId()))
.and(eq(ALARM_ORIGINATOR_TYPE_PROPERTY, alarm.getOriginator().getEntityType()))
.and(eq(ALARM_TYPE_PROPERTY, alarm.getType()));
log.debug("Remove request: {}", delete.toString());
return executeWrite(tenantId, delete).wasApplied();
}
@ -122,10 +123,10 @@ public class CassandraAlarmDao extends CassandraAbstractModelDao<AlarmEntity, Al
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(
findAlarmByIdAsync(tenantId, relation.getTo().getId()),
AlarmInfo::new));
AlarmInfo::new, MoreExecutors.directExecutor()));
}
return Futures.successfulAsList(alarmFutures);
});
}, MoreExecutors.directExecutor());
}
@Override

View File

@ -16,9 +16,9 @@
package org.thingsboard.server.dao.asset;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,7 +29,6 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
@ -62,7 +61,10 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.CacheConstants.ASSET_CACHE;
import static org.thingsboard.server.dao.DaoUtil.toUUIDs;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
import static org.thingsboard.server.dao.service.Validator.*;
import static org.thingsboard.server.dao.service.Validator.validateId;
import static org.thingsboard.server.dao.service.Validator.validateIds;
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
import static org.thingsboard.server.dao.service.Validator.validateString;
@Service
@Slf4j
@ -258,9 +260,9 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
}
}
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
assets = Futures.transform(assets, assetList ->
assetList == null ? Collections.emptyList() : assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType())).collect(Collectors.toList())
assetList == null ? Collections.emptyList() : assetList.stream().filter(asset -> query.getAssetTypes().contains(asset.getType())).collect(Collectors.toList()), MoreExecutors.directExecutor()
);
return assets;
}
@ -274,7 +276,7 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
assetTypes -> {
assetTypes.sort(Comparator.comparing(EntitySubtype::getType));
return assetTypes;
});
}, MoreExecutors.directExecutor());
}
private DataValidator<Asset> assetValidator =
@ -335,18 +337,18 @@ public class BaseAssetService extends AbstractEntityService implements AssetServ
};
private PaginatedRemover<TenantId, Asset> tenantAssetsRemover =
new PaginatedRemover<TenantId, Asset>() {
new PaginatedRemover<TenantId, Asset>() {
@Override
protected List<Asset> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return assetDao.findAssetsByTenantId(id.getId(), pageLink);
}
@Override
protected List<Asset> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return assetDao.findAssetsByTenantId(id.getId(), pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, Asset entity) {
deleteAsset(tenantId, new AssetId(entity.getId().getId()));
}
};
@Override
protected void removeEntity(TenantId tenantId, Asset entity) {
deleteAsset(tenantId, new AssetId(entity.getId().getId()));
}
};
private PaginatedRemover<CustomerId, Asset> customerAssetsUnasigner = new PaginatedRemover<CustomerId, Asset>() {

View File

@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntitySubtype;
@ -185,7 +186,7 @@ public class CassandraAssetDao extends CassandraAbstractSearchTextDao<AssetEntit
return Collections.emptyList();
}
}
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.dashboard;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -85,7 +86,7 @@ public class CassandraDashboardInfoDao extends CassandraAbstractSearchTextDao<Da
dashboardFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId()));
}
return Futures.successfulAsList(dashboardFutures);
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.dashboard;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -64,7 +65,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
@Autowired
private TenantDao tenantDao;
@Autowired
private CustomerDao customerDao;
@ -102,7 +103,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
dashboardValidator.validate(dashboard, DashboardInfo::getTenantId);
return dashboardDao.save(dashboard.getTenantId(), dashboard);
}
@Override
public Dashboard assignDashboardToCustomer(TenantId tenantId, DashboardId dashboardId, CustomerId customerId) {
Dashboard dashboard = findDashboardById(tenantId, dashboardId);
@ -203,7 +204,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
public TimePageData<DashboardInfo> apply(@Nullable List<DashboardInfo> dashboards) {
return new TimePageData<>(dashboards, pageLink);
}
});
}, MoreExecutors.directExecutor());
}
@Override
@ -244,24 +245,24 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
}
}
}
};
};
private PaginatedRemover<TenantId, DashboardInfo> tenantDashboardsRemover =
new PaginatedRemover<TenantId, DashboardInfo>() {
@Override
protected List<DashboardInfo> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return dashboardInfoDao.findDashboardsByTenantId(id.getId(), pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, DashboardInfo entity) {
deleteDashboard(tenantId, new DashboardId(entity.getUuidId()));
}
};
@Override
protected List<DashboardInfo> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return dashboardInfoDao.findDashboardsByTenantId(id.getId(), pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, DashboardInfo entity) {
deleteDashboard(tenantId, new DashboardId(entity.getUuidId()));
}
};
private class CustomerDashboardsUnassigner extends TimePaginatedRemover<Customer, DashboardInfo> {
private Customer customer;
CustomerDashboardsUnassigner(Customer customer) {
@ -282,7 +283,7 @@ public class DashboardServiceImpl extends AbstractEntityService implements Dashb
protected void removeEntity(TenantId tenantId, DashboardInfo entity) {
unassignDashboardFromCustomer(customer.getTenantId(), new DashboardId(entity.getUuidId()), this.customer.getId());
}
}
private class CustomerDashboardsUpdater extends TimePaginatedRemover<Customer, DashboardInfo> {

View File

@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device;
@ -178,14 +179,14 @@ public class CassandraDeviceDao extends CassandraAbstractSearchTextDao<DeviceEnt
if (result != null) {
List<EntitySubtype> entitySubtypes = new ArrayList<>();
result.all().forEach((entitySubtypeEntity) ->
entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
entitySubtypes.add(entitySubtypeEntity.toEntitySubtype())
);
return entitySubtypes;
} else {
return Collections.emptyList();
}
}
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -97,9 +98,9 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
}
log.warn("Failed to find claimingAllowed attribute for device or it is already claimed![{}]", device.getName());
throw new IllegalArgumentException();
});
}, MoreExecutors.directExecutor());
}
});
}, MoreExecutors.directExecutor());
}
private ClaimDataInfo getClaimData(Cache cache, Device device) throws ExecutionException, InterruptedException {
@ -138,9 +139,9 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
if (device.getCustomerId().getId().equals(ModelConstants.NULL_UUID)) {
device.setCustomerId(customerId);
Device savedDevice = deviceService.saveDevice(device);
return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(savedDevice, ClaimResponse.SUCCESS));
return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(savedDevice, ClaimResponse.SUCCESS), MoreExecutors.directExecutor());
}
return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(null, ClaimResponse.CLAIMED));
return Futures.transform(removeClaimingSavedData(cache, claimData, device), result -> new ClaimResult(null, ClaimResponse.CLAIMED), MoreExecutors.directExecutor());
}
} else {
log.warn("Failed to find the device's claiming message![{}]", device.getName());

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.device;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.hibernate.exception.ConstraintViolationException;
@ -291,7 +292,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
}
}
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
devices = Futures.transform(devices, new Function<List<Device>, List<Device>>() {
@Nullable
@ -299,7 +300,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
public List<Device> apply(@Nullable List<Device> deviceList) {
return deviceList == null ? Collections.emptyList() : deviceList.stream().filter(device -> query.getDeviceTypes().contains(device.getType())).collect(Collectors.toList());
}
});
}, MoreExecutors.directExecutor());
return devices;
}
@ -313,7 +314,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
deviceTypes -> {
deviceTypes.sort(Comparator.comparing(EntitySubtype::getType));
return deviceTypes;
});
}, MoreExecutors.directExecutor());
}
private DataValidator<Device> deviceValidator =
@ -374,18 +375,18 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
};
private PaginatedRemover<TenantId, Device> tenantDevicesRemover =
new PaginatedRemover<TenantId, Device>() {
new PaginatedRemover<TenantId, Device>() {
@Override
protected List<Device> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return deviceDao.findDevicesByTenantId(id.getId(), pageLink);
}
@Override
protected List<Device> findEntities(TenantId tenantId, TenantId id, TextPageLink pageLink) {
return deviceDao.findDevicesByTenantId(id.getId(), pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, Device entity) {
deleteDevice(tenantId, new DeviceId(entity.getUuidId()));
}
};
@Override
protected void removeEntity(TenantId tenantId, Device entity) {
deleteDevice(tenantId, new DeviceId(entity.getUuidId()));
}
};
private PaginatedRemover<CustomerId, Device> customerDeviceUnasigner = new PaginatedRemover<CustomerId, Device>() {

View File

@ -18,12 +18,21 @@ package org.thingsboard.server.dao.entity;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.customer.CustomerService;
@ -109,7 +118,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
default:
throw new IllegalStateException("Not Implemented!");
}
entityName = Futures.transform(hasName, (Function<HasName, String>) hasName1 -> hasName1 != null ? hasName1.getName() : null );
entityName = Futures.transform(hasName, (Function<HasName, String>) hasName1 -> hasName1 != null ? hasName1.getName() : null, MoreExecutors.directExecutor());
return entityName;
}

View File

@ -23,6 +23,7 @@ import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntitySubtype;
@ -97,7 +98,7 @@ public class CassandraEntityViewDao extends CassandraAbstractSearchTextDao<Entit
log.debug("Try to find entity views by tenantId [{}] and pageLink [{}]", tenantId, pageLink);
List<EntityViewEntity> entityViewEntities =
findPageWithTextSearch(new TenantId(tenantId), ENTITY_VIEW_BY_TENANT_AND_SEARCH_TEXT_CF,
Collections.singletonList(eq(TENANT_ID_PROPERTY, tenantId)), pageLink);
Collections.singletonList(eq(TENANT_ID_PROPERTY, tenantId)), pageLink);
log.trace("Found entity views [{}] by tenantId [{}] and pageLink [{}]",
entityViewEntities, tenantId, pageLink);
return DaoUtil.convertDataList(entityViewEntities);
@ -181,6 +182,6 @@ public class CassandraEntityViewDao extends CassandraAbstractSearchTextDao<Entit
return Collections.emptyList();
}
}
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -19,6 +19,7 @@ import com.google.common.base.Function;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -199,7 +200,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
}
}
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
entityViews = Futures.transform(entityViews, new Function<List<EntityView>, List<EntityView>>() {
@Nullable
@ -207,7 +208,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
public List<EntityView> apply(@Nullable List<EntityView> entityViewList) {
return entityViewList == null ? Collections.emptyList() : entityViewList.stream().filter(entityView -> query.getEntityViewTypes().contains(entityView.getType())).collect(Collectors.toList());
}
});
}, MoreExecutors.directExecutor());
return entityViews;
}
@ -246,7 +247,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
public void onFailure(Throwable t) {
log.error("Error while finding entity views by tenantId and entityId", t);
}
});
}, MoreExecutors.directExecutor());
return entityViewsFuture;
}
}
@ -279,7 +280,7 @@ public class EntityViewServiceImpl extends AbstractEntityService implements Enti
entityViewTypes -> {
entityViewTypes.sort(Comparator.comparing(EntitySubtype::getType));
return entityViewTypes;
});
}, MoreExecutors.directExecutor());
}
private DataValidator<EntityView> entityViewValidator =

View File

@ -22,6 +22,7 @@ import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
@ -45,10 +46,12 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.in;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
import static org.thingsboard.server.dao.model.ModelConstants.*;
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_ID_VIEW_NAME;
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_TYPE_AND_ID_VIEW_NAME;
import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
@Component
@Slf4j
@ -96,7 +99,7 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
event.setUid(event.getId().toString());
}
ListenableFuture<Optional<Event>> optionalSave = saveAsync(event.getTenantId(), new EventEntity(event), false, eventsTtl);
return Futures.transform(optionalSave, opt -> opt.orElse(null));
return Futures.transform(optionalSave, opt -> opt.orElse(null), MoreExecutors.directExecutor());
}
@Override
@ -210,6 +213,6 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
} else {
return Optional.empty();
}
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -26,6 +26,7 @@ import com.datastax.driver.mapping.Result;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.Dao;
@ -86,7 +87,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
return Collections.emptyList();
}
}
});
}, MoreExecutors.directExecutor());
}
return Futures.immediateFuture(Collections.emptyList());
}
@ -120,7 +121,7 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
return null;
}
}
});
}, MoreExecutors.directExecutor());
}
return Futures.immediateFuture(null);
}
@ -191,5 +192,5 @@ public abstract class CassandraAbstractModelDao<E extends BaseEntity<D>, D> exte
List<E> entities = findListByStatement(tenantId, QueryBuilder.select().all().from(getColumnFamilyName()).setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()));
return DaoUtil.convertDataList(entities);
}
}

View File

@ -22,6 +22,7 @@ import com.datastax.driver.core.Statement;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.thingsboard.server.dao.exception.BufferLimitException;
import org.thingsboard.server.dao.util.AsyncRateLimiter;
@ -44,9 +45,9 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
rateLimiter.release();
}
return Futures.immediateFailedFuture(t);
});
}, MoreExecutors.directExecutor());
this.originalFuture = Futures.transform(rateLimitFuture,
i -> executeAsyncWithRelease(rateLimiter, session, statement));
i -> executeAsyncWithRelease(rateLimiter, session, statement), MoreExecutors.directExecutor());
}
@ -145,7 +146,7 @@ public class RateLimitedResultSetFuture implements ResultSetFuture {
public void onFailure(Throwable t) {
rateLimiter.release();
}
});
}, MoreExecutors.directExecutor());
return resultSetFuture;
} catch (RuntimeException re) {
rateLimiter.release();

View File

@ -16,7 +16,10 @@
package org.thingsboard.server.dao.relation;
import com.google.common.base.Function;
import com.google.common.util.concurrent.*;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
@ -206,17 +209,20 @@ public class BaseRelationService implements RelationService {
relations -> {
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, cache, true);
return Futures.allAsList(results);
});
}, MoreExecutors.directExecutor());
ListenableFuture<List<Boolean>> outboundDeletions = Futures.transformAsync(outboundRelations,
relations -> {
List<ListenableFuture<Boolean>> results = deleteRelationGroupsAsync(tenantId, relations, cache, false);
return Futures.allAsList(results);
});
}, MoreExecutors.directExecutor());
ListenableFuture<List<List<Boolean>>> deletionsFuture = Futures.allAsList(inboundDeletions, outboundDeletions);
return Futures.transform(Futures.transformAsync(deletionsFuture, (deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId)), result -> null);
return Futures.transform(Futures.transformAsync(deletionsFuture,
(deletions) -> relationDao.deleteOutboundRelationsAsync(tenantId, entityId),
MoreExecutors.directExecutor()),
result -> null, MoreExecutors.directExecutor());
}
private List<ListenableFuture<Boolean>> deleteRelationGroupsAsync(TenantId tenantId, List<List<EntityRelation>> relations, Cache cache, boolean deleteFromDb) {
@ -306,9 +312,11 @@ public class BaseRelationService implements RelationService {
public void onSuccess(@Nullable List<EntityRelation> result) {
cache.putIfAbsent(fromAndTypeGroup, result);
}
@Override
public void onFailure(Throwable t) {}
});
public void onFailure(Throwable t) {
}
}, MoreExecutors.directExecutor());
return relationsFuture;
}
}
@ -328,7 +336,7 @@ public class BaseRelationService implements RelationService {
EntityRelationInfo::setToName))
);
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
}
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #relationType, #typeGroup, 'FROM'}")
@ -385,9 +393,11 @@ public class BaseRelationService implements RelationService {
public void onSuccess(@Nullable List<EntityRelation> result) {
cache.putIfAbsent(toAndTypeGroup, result);
}
@Override
public void onFailure(Throwable t) {}
});
public void onFailure(Throwable t) {
}
}, MoreExecutors.directExecutor());
return relationsFuture;
}
}
@ -407,7 +417,7 @@ public class BaseRelationService implements RelationService {
EntityRelationInfo::setFromName))
);
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<EntityRelationInfo> fetchRelationInfoAsync(TenantId tenantId, EntityRelation relation,
@ -418,7 +428,7 @@ public class BaseRelationService implements RelationService {
EntityRelationInfo entityRelationInfo1 = new EntityRelationInfo(relation);
entityNameSetter.accept(entityRelationInfo1, entityName1);
return entityRelationInfo1;
});
}, MoreExecutors.directExecutor());
}
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#to, #relationType, #typeGroup, 'TO'}")
@ -466,7 +476,7 @@ public class BaseRelationService implements RelationService {
}
}
return relations;
});
}, MoreExecutors.directExecutor());
} catch (Exception e) {
log.warn("Failed to query relations: [{}]", query, e);
throw new RuntimeException(e);
@ -493,7 +503,7 @@ public class BaseRelationService implements RelationService {
}))
);
return Futures.successfulAsList(futures);
});
}, MoreExecutors.directExecutor());
}
protected void validate(EntityRelation relation) {
@ -600,7 +610,7 @@ public class BaseRelationService implements RelationService {
}
//TODO: try to remove this blocking operation
List<Set<EntityRelation>> relations = Futures.successfulAsList(futures).get();
if (fetchLastLevelOnly && lvl > 0){
if (fetchLastLevelOnly && lvl > 0) {
children.clear();
}
relations.forEach(r -> r.forEach(children::add));

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.alarm;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
@ -108,9 +109,9 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
for (EntityRelation relation : input) {
alarmFutures.add(Futures.transform(
findAlarmByIdAsync(tenantId, relation.getTo().getId()),
AlarmInfo::new));
AlarmInfo::new, MoreExecutors.directExecutor()));
}
return Futures.successfulAsList(alarmFutures);
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql.dashboard;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
@ -91,6 +92,6 @@ public class JpaDashboardInfoDao extends JpaAbstractSearchTextDao<DashboardInfoE
dashboardFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId()));
}
return Futures.successfulAsList(dashboardFutures);
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -168,7 +169,7 @@ public abstract class AbstractChunkedAggregationTimeseriesDao extends AbstractSq
} else {
return Optional.empty();
}
});
}, MoreExecutors.directExecutor());
}
protected void switchAggregation(TenantId tenantId, EntityId entityId, String key, long startTs, long endTs, Aggregation aggregation, List<CompletableFuture<TsKvEntity>> entitiesFutures) {

View File

@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
@ -235,7 +236,7 @@ public abstract class AbstractSqlTimeseriesDao extends JpaAbstractDaoListeningEx
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
}
});
}, MoreExecutors.directExecutor());
return resultFuture;
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sqlts.timescale;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -143,7 +144,7 @@ public class TimescaleTimeseriesDao extends AbstractSqlTimeseriesDao implements
} else {
return Collections.emptyList();
}
});
}, MoreExecutors.directExecutor());
}
@Override

View File

@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -330,7 +331,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
stmt.setInt(6, (int) ttl);
}
futures.add(getFuture(executeAsyncWrite(tenantId, stmt), rs -> null));
return Futures.transform(Futures.allAsList(futures), result -> null);
return Futures.transform(Futures.allAsList(futures), result -> null, MoreExecutors.directExecutor());
}
private void processSetNullValues(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry, long ttl, List<ListenableFuture<Void>> futures, long partition, DataType type) {
@ -545,7 +546,7 @@ public class CassandraBaseTimeseriesDao extends CassandraAbstractAsyncDao implem
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
}
});
}, MoreExecutors.directExecutor());
return resultFuture;
}

View File

@ -119,7 +119,7 @@ public class RateLimitedResultSetFutureTest {
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor());
Row actualRow = transform.get();
assertSame(row, actualRow);
@ -132,7 +132,7 @@ public class RateLimitedResultSetFutureTest {
when(rateLimiter.acquireAsync()).thenReturn(Futures.immediateFuture(null));
when(session.executeAsync(statement)).thenThrow(new UnsupportedFeatureException(ProtocolVersion.V3, "hjg"));
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor());
try {
transform.get();
fail();
@ -156,7 +156,7 @@ public class RateLimitedResultSetFutureTest {
when(realFuture.get()).thenThrow(new ExecutionException("Fail", new TimeoutException("timeout")));
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor());
try {
transform.get();
fail();
@ -177,7 +177,7 @@ public class RateLimitedResultSetFutureTest {
when(rateLimiter.acquireAsync()).thenReturn(future);
resultSetFuture = new RateLimitedResultSetFuture(session, rateLimiter, statement);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one);
ListenableFuture<Row> transform = Futures.transform(resultSetFuture, ResultSet::one, MoreExecutors.directExecutor());
// TimeUnit.MILLISECONDS.sleep(200);
future.cancel(false);
latch.countDown();

View File

@ -44,7 +44,7 @@
<cassandra.version>3.6.0</cassandra.version>
<cassandra-unit.version>3.5.0.1</cassandra-unit.version>
<takari-cpsuite.version>1.2.7</takari-cpsuite.version>
<guava.version>21.0</guava.version>
<guava.version>28.2-jre</guava.version>
<caffeine.version>2.6.1</caffeine.version>
<commons-lang3.version>3.4</commons-lang3.version>
<commons-validator.version>1.6</commons-validator.version>
@ -63,7 +63,7 @@
<mail.version>1.4.3</mail.version>
<curator.version>4.2.0</curator.version>
<zookeeper.version>3.5.5</zookeeper.version>
<protobuf.version>3.6.1</protobuf.version>
<protobuf.version>3.11.4</protobuf.version>
<grpc.version>1.22.1</grpc.version>
<lombok.version>1.16.18</lombok.version>
<paho.client.version>1.1.0</paho.client.version>

View File

@ -20,6 +20,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -54,9 +55,9 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
@Slf4j
public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationActionNodeConfiguration> implements TbNode {
@ -86,7 +87,7 @@ public abstract class TbAbstractRelationActionNode<C extends TbAbstractRelationA
}
protected ListenableFuture<RelationContainer> processEntityRelationAction(TbContext ctx, TbMsg msg) {
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer));
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor());
}
protected abstract boolean createEntityIfNotExists();

View File

@ -18,12 +18,13 @@ package org.thingsboard.rule.engine.action;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.plugin.ComponentType;
@ -80,8 +81,8 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
}
alarm.setStatus(alarm.getStatus().isAck() ? AlarmStatus.CLEARED_ACK : AlarmStatus.CLEARED_UNACK);
return Futures.immediateFuture(new AlarmResult(false, false, true, alarm));
});
});
}, MoreExecutors.directExecutor());
}, MoreExecutors.directExecutor());
}, ctx.getDbCallbackExecutor());
}
}

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -108,18 +109,18 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) {
ListenableFuture<Alarm> asyncAlarm;
if (msgAlarm != null) {
asyncAlarm = Futures.immediateCheckedFuture(msgAlarm);
asyncAlarm = Futures.immediateFuture(msgAlarm);
} else {
ctx.logJsEvalRequest();
asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
details -> {
ctx.logJsEvalResponse();
return buildAlarm(msg, details, ctx.getTenantId());
});
}, MoreExecutors.directExecutor());
}
ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm));
return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm), MoreExecutors.directExecutor());
}
private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) {
@ -140,7 +141,7 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
return ctx.getAlarmService().createOrUpdateAlarm(existingAlarm);
}, ctx.getDbCallbackExecutor());
return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a));
return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a), MoreExecutors.directExecutor());
}
private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -81,7 +82,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
}
container.setResult(result);
return container;
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
@ -120,7 +121,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
for (EntityRelation relation : entityRelations) {
list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation));
}
return Futures.transform(Futures.allAsList(list), result -> false);
return Futures.transform(Futures.allAsList(list), result -> false, MoreExecutors.directExecutor());
}
return Futures.immediateFuture(false);
}, ctx.getDbCallbackExecutor());
@ -161,7 +162,7 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR
} else {
return Futures.immediateFuture(true);
}
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> processAsset(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId) {

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -66,17 +67,18 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
@Override
protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result));
return Futures.transform(processSingle(ctx, msg, entityContainer), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
}
private ListenableFuture<RelationContainer> getRelationContainerListenableFuture(TbContext ctx, TbMsg msg) {
relationType = processPattern(msg, config.getRelationType());
if (config.isDeleteForSingleEntity()) {
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer));
return Futures.transformAsync(getEntity(ctx, msg), entityContainer -> doProcessEntityRelationAction(ctx, msg, entityContainer), MoreExecutors.directExecutor());
} else {
return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result));
return Futures.transform(processList(ctx, msg), result -> new RelationContainer(msg, result), MoreExecutors.directExecutor());
}
}
private ListenableFuture<Boolean> processList(TbContext ctx, TbMsg msg) {
return Futures.transformAsync(processListSearchDirection(ctx, msg), entityRelations -> {
if (entityRelations.isEmpty()) {
@ -93,9 +95,9 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
}
}
return Futures.immediateFuture(true);
});
}, MoreExecutors.directExecutor());
}
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> processSingle(TbContext ctx, TbMsg msg, EntityContainer entityContainer) {
@ -106,7 +108,7 @@ public class TbDeleteRelationNode extends TbAbstractRelationActionNode<TbDeleteR
return processSingleDeleteRelation(ctx, sdId);
}
return Futures.immediateFuture(true);
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Boolean> processSingleDeleteRelation(TbContext ctx, SearchDirectionIds sdId) {

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -27,17 +28,12 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.UUID;
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
@RuleNode(
@ -91,7 +87,7 @@ public class TbCheckAlarmStatusNode implements TbNode {
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
});
}, MoreExecutors.directExecutor());
} catch (IOException e) {
log.error("Failed to parse alarm: [{}]", msg.getData());
throw new TbNodeException(e);

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.filter;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -87,10 +88,10 @@ public class TbCheckRelationNode implements TbNode {
private ListenableFuture<Boolean> processList(TbContext ctx, TbMsg msg) {
if (EntitySearchDirection.FROM.name().equals(config.getDirection())) {
return Futures.transformAsync(ctx.getRelationService()
.findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList);
.findByToAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList, MoreExecutors.directExecutor());
} else {
return Futures.transformAsync(ctx.getRelationService()
.findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList);
.findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), config.getRelationType(), RelationTypeGroup.COMMON), this::isEmptyList, MoreExecutors.directExecutor());
}
}

View File

@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
@ -122,7 +123,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
}
return null;
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Void> putLatestTelemetry(TbContext ctx, EntityId entityId, TbMsg msg, String scope, List<String> keys, ConcurrentHashMap<String, List<String>> failuresMap) {
@ -152,7 +153,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
});
return null;
});
}, MoreExecutors.directExecutor());
}
private void putValueWithTs(TbMsg msg, TsKvEntry r) {

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
@ -37,15 +38,16 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
import java.lang.reflect.Type;
import java.util.Map;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@Slf4j
public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEntityDetailsNodeConfiguration> implements TbNode {
private static final Gson gson = new Gson();
private static final JsonParser jsonParser = new JsonParser();
private static final Type TYPE = new TypeToken<Map<String, String>>() {}.getType();
private static final Type TYPE = new TypeToken<Map<String, String>>() {
}.getType();
protected C config;
@ -104,7 +106,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<JsonElement> addContactProperties(JsonElement data, ListenableFuture<ContactBased> entityFuture, EntityDetails entityDetails, String prefix) {
@ -114,7 +116,7 @@ public abstract class TbAbstractGetEntityDetailsNode<C extends TbAbstractGetEnti
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
}
private JsonElement setProperties(ContactBased entity, JsonElement data, EntityDetails entityDetails, String prefix) {

View File

@ -17,12 +17,13 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -60,7 +61,7 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
}
private void safeGetAttributes(TbContext ctx, TbMsg msg, T entityId) {
if(entityId == null || entityId.isNullUid()) {
if (entityId == null || entityId.isNullUid()) {
ctx.tellNext(msg, FAILURE);
return;
}
@ -73,13 +74,13 @@ public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode
private ListenableFuture<List<KvEntry>> getAttributesAsync(TbContext ctx, EntityId entityId) {
ListenableFuture<List<AttributeKvEntry>> latest = ctx.getAttributesService().find(ctx.getTenantId(), entityId, SERVER_SCOPE, config.getAttrMapping().keySet());
return Futures.transform(latest, l ->
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor());
}
private ListenableFuture<List<KvEntry>> getLatestTelemetry(TbContext ctx, EntityId entityId) {
ListenableFuture<List<TsKvEntry>> latest = ctx.getTimeseriesService().findLatest(ctx.getTenantId(), entityId, config.getAttrMapping().keySet());
return Futures.transform(latest, l ->
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()));
l.stream().map(i -> (KvEntry) i).collect(Collectors.toList()), MoreExecutors.directExecutor());
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -63,7 +64,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode<TbG
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
}
private ListenableFuture<Customer> getCustomer(TbContext ctx, TbMsg msg) {
@ -79,7 +80,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode<TbG
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
case ASSET:
return Futures.transformAsync(ctx.getAssetService().findAssetByIdAsync(ctx.getTenantId(), new AssetId(msg.getOriginator().getId())), asset -> {
if (asset != null) {
@ -91,7 +92,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode<TbG
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
case ENTITY_VIEW:
return Futures.transformAsync(ctx.getEntityViewService().findEntityViewByIdAsync(ctx.getTenantId(), new EntityViewId(msg.getOriginator().getId())), entityView -> {
if (entityView != null) {
@ -103,7 +104,7 @@ public class TbGetCustomerDetailsNode extends TbAbstractGetEntityDetailsNode<TbG
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
default:
throw new RuntimeException("Entity with entityType '" + msg.getOriginator().getEntityType() + "' is not supported.");
}

View File

@ -17,16 +17,21 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.*;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.util.EntitiesFieldsAsyncLoader;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
import static org.thingsboard.common.util.DonAsynchron.withCallback;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
/**
* Created by ashvayka on 19.01.18.
@ -71,7 +76,7 @@ public class TbGetOriginatorFieldsNode implements TbNode {
}
});
return null;
}
}, MoreExecutors.directExecutor()
);
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.metadata;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
@ -59,6 +60,6 @@ public class TbGetTenantDetailsNode extends TbAbstractGetEntityDetailsNode<TbGet
} else {
return Futures.immediateFuture(null);
}
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.alarm.Alarm;
@ -39,6 +40,6 @@ public class EntitiesAlarmOriginatorIdAsyncLoader {
return Futures.transformAsync(future, in -> {
return in != null ? Futures.immediateFuture(in.getOriginator())
: Futures.immediateFuture(null);
});
}, MoreExecutors.directExecutor());
}
}

View File

@ -15,13 +15,17 @@
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.UserId;
public class EntitiesCustomerIdAsyncLoader {
@ -44,6 +48,6 @@ public class EntitiesCustomerIdAsyncLoader {
private static <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) {
return Futures.transformAsync(future, in -> in != null ? Futures.immediateFuture(in.getCustomerId())
: Futures.immediateFuture(null));
: Futures.immediateFuture(null), MoreExecutors.directExecutor());
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.BaseData;
@ -66,6 +67,6 @@ public class EntitiesFieldsAsyncLoader {
ListenableFuture<T> future, Function<T, EntityFieldsData> converter) {
return Futures.transformAsync(future, in -> in != null ?
Futures.immediateFuture(converter.apply(in))
: Futures.immediateFailedFuture(new RuntimeException("Entity not found!")));
: Futures.immediateFailedFuture(new RuntimeException("Entity not found!")), MoreExecutors.directExecutor());
}
}

View File

@ -15,9 +15,9 @@
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.data.DeviceRelationsQuery;
@ -40,7 +40,7 @@ public class EntitiesRelatedDeviceIdAsyncLoader {
ListenableFuture<List<Device>> asyncDevices = deviceService.findDevicesByQuery(ctx.getTenantId(), query);
return Futures.transformAsync(asyncDevices, d -> CollectionUtils.isNotEmpty(d) ? Futures.immediateFuture(d.get(0).getId())
: Futures.immediateFuture(null));
: Futures.immediateFuture(null), MoreExecutors.directExecutor());
}
private static DeviceSearchQuery buildQuery(EntityId originator, DeviceRelationsQuery deviceRelationsQuery) {

View File

@ -15,9 +15,9 @@
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.data.RelationsQuery;
@ -39,10 +39,10 @@ public class EntitiesRelatedEntityIdAsyncLoader {
ListenableFuture<List<EntityRelation>> asyncRelation = relationService.findByQuery(ctx.getTenantId(), query);
if (relationsQuery.getDirection() == EntitySearchDirection.FROM) {
return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getTo())
: Futures.immediateFuture(null));
: Futures.immediateFuture(null), MoreExecutors.directExecutor());
} else if (relationsQuery.getDirection() == EntitySearchDirection.TO) {
return Futures.transformAsync(asyncRelation, r -> CollectionUtils.isNotEmpty(r) ? Futures.immediateFuture(r.get(0).getFrom())
: Futures.immediateFuture(null));
: Futures.immediateFuture(null), MoreExecutors.directExecutor());
}
return Futures.immediateFailedFuture(new IllegalStateException("Unknown direction"));
}

View File

@ -15,14 +15,20 @@
*/
package org.thingsboard.rule.engine.util;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.alarm.AlarmId;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
public class EntitiesTenantIdAsyncLoader {
@ -51,6 +57,7 @@ public class EntitiesTenantIdAsyncLoader {
private static <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) {
return Futures.transformAsync(future, in -> {
return in != null ? Futures.immediateFuture(in.getTenantId())
: Futures.immediateFuture(null);});
: Futures.immediateFuture(null);
}, MoreExecutors.directExecutor());
}
}