diff --git a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java index 73a27a0012..361dbfbde1 100644 --- a/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/job/task/DummyTaskProcessor.java @@ -23,10 +23,10 @@ import org.thingsboard.server.queue.task.TaskProcessor; @Component @RequiredArgsConstructor -public class DummyTaskProcessor extends TaskProcessor { +public class DummyTaskProcessor extends TaskProcessor { @Override - public void process(DummyTask task) throws Exception { + public Void process(DummyTask task) throws Exception { if (task.getProcessingTimeMs() > 0) { Thread.sleep(task.getProcessingTimeMs()); } @@ -37,6 +37,7 @@ public class DummyTaskProcessor extends TaskProcessor { String error = task.getErrors().get(task.getAttempt() - 1); throw new RuntimeException(error); } + return null; } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java index f0ba542ab9..57dd46a6e5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/task/TaskProcessor.java @@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -public abstract class TaskProcessor { +public abstract class TaskProcessor { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -135,7 +135,7 @@ public abstract class TaskProcessor { } } - public abstract void process(T task) throws Exception; + public abstract R process(T task) throws Exception; private void reportSuccess(Task task) { TaskResult result = TaskResult.builder() diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index 4f7538d205..eb051b19e5 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -19,7 +19,9 @@ import com.auth0.jwt.JWT; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Strings; +import lombok.SneakyThrows; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.concurrent.LazyInitializer; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.Resource; @@ -55,9 +57,9 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityViewInfo; import org.thingsboard.server.common.data.EventInfo; -import org.thingsboard.server.common.data.ResourceExportData; import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackageInfo; +import org.thingsboard.server.common.data.ResourceExportData; import org.thingsboard.server.common.data.ResourceSubType; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.StringUtils; @@ -205,7 +207,9 @@ public class RestClient implements Closeable { private static final String JWT_TOKEN_HEADER_PARAM = "X-Authorization"; private static final long AVG_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(30); protected static final String ACTIVATE_TOKEN_REGEX = "/api/noauth/activate?activateToken="; - private final ExecutorService service = ThingsBoardExecutors.newWorkStealingPool(10, getClass()); + private final LazyInitializer executor = LazyInitializer.builder() + .setInitializer(() -> ThingsBoardExecutors.newWorkStealingPool(10, getClass())) + .get(); protected final RestTemplate restTemplate; protected final RestTemplate loginRestTemplate; protected final String baseURL; @@ -223,22 +227,30 @@ public class RestClient implements Closeable { } public RestClient(RestTemplate restTemplate, String baseURL) { + this(restTemplate, baseURL, null); + } + + public RestClient(RestTemplate restTemplate, String baseURL, String accessToken) { this.restTemplate = restTemplate; this.loginRestTemplate = new RestTemplate(restTemplate.getRequestFactory()); this.baseURL = baseURL; this.restTemplate.getInterceptors().add((request, bytes, execution) -> { HttpRequest wrapper = new HttpRequestWrapper(request); - long calculatedTs = System.currentTimeMillis() + clientServerTimeDiff + AVG_REQUEST_TIMEOUT; - if (calculatedTs > mainTokenExpTs) { - synchronized (RestClient.this) { - if (calculatedTs > mainTokenExpTs) { - if (calculatedTs < refreshTokenExpTs) { - refreshToken(); - } else { - doLogin(); + if (accessToken == null) { + long calculatedTs = System.currentTimeMillis() + clientServerTimeDiff + AVG_REQUEST_TIMEOUT; + if (calculatedTs > mainTokenExpTs) { + synchronized (RestClient.this) { + if (calculatedTs > mainTokenExpTs) { + if (calculatedTs < refreshTokenExpTs) { + refreshToken(); + } else { + doLogin(); + } } } } + } else { + mainToken = accessToken; } wrapper.getHeaders().set(JWT_TOKEN_HEADER_PARAM, "Bearer " + mainToken); return execution.execute(wrapper, bytes); @@ -2403,7 +2415,7 @@ public class RestClient implements Closeable { } public Future> getAttributeKvEntriesAsync(EntityId entityId, List keys) { - return service.submit(() -> getAttributeKvEntries(entityId, keys)); + return getExecutor().submit(() -> getAttributeKvEntries(entityId, keys)); } public List getAttributesByScope(EntityId entityId, String scope, List keys) { @@ -2976,7 +2988,7 @@ public class RestClient implements Closeable { addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params); return restTemplate.exchange( baseURL + "/api/widgetTypes?" + getUrlParams(pageLink) + - getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), + getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference>() { @@ -3064,7 +3076,7 @@ public class RestClient implements Closeable { addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params); return restTemplate.exchange( baseURL + "/api/widgetTypesInfos?widgetsBundleId={widgetsBundleId}&" + getUrlParams(pageLink) + - getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), + getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference>() { @@ -3765,7 +3777,7 @@ public class RestClient implements Closeable { } public PageData getImages(PageLink pageLink, boolean includeSystemImages) { - return this.getImages(pageLink, null, includeSystemImages); + return this.getImages(pageLink, null, includeSystemImages); } public PageData getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) { @@ -4175,7 +4187,14 @@ public class RestClient implements Closeable { @Override public void close() { - service.shutdown(); + if (executor.isInitialized()) { + getExecutor().shutdown(); + } + } + + @SneakyThrows + private ExecutorService getExecutor() { + return executor.get(); } }