Minor refactoring
This commit is contained in:
		
							parent
							
								
									44a4d9d690
								
							
						
					
					
						commit
						d357be9209
					
				@ -23,10 +23,10 @@ import org.thingsboard.server.queue.task.TaskProcessor;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
@Component
 | 
					@Component
 | 
				
			||||||
@RequiredArgsConstructor
 | 
					@RequiredArgsConstructor
 | 
				
			||||||
public class DummyTaskProcessor extends TaskProcessor<DummyTask> {
 | 
					public class DummyTaskProcessor extends TaskProcessor<DummyTask, Void> {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void process(DummyTask task) throws Exception {
 | 
					    public Void process(DummyTask task) throws Exception {
 | 
				
			||||||
        if (task.getProcessingTimeMs() > 0) {
 | 
					        if (task.getProcessingTimeMs() > 0) {
 | 
				
			||||||
            Thread.sleep(task.getProcessingTimeMs());
 | 
					            Thread.sleep(task.getProcessingTimeMs());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -37,6 +37,7 @@ public class DummyTaskProcessor extends TaskProcessor<DummyTask> {
 | 
				
			|||||||
            String error = task.getErrors().get(task.getAttempt() - 1);
 | 
					            String error = task.getErrors().get(task.getAttempt() - 1);
 | 
				
			||||||
            throw new RuntimeException(error);
 | 
					            throw new RuntimeException(error);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        return null;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
 | 
				
			|||||||
@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 | 
				
			|||||||
import java.util.concurrent.ExecutorService;
 | 
					import java.util.concurrent.ExecutorService;
 | 
				
			||||||
import java.util.concurrent.Executors;
 | 
					import java.util.concurrent.Executors;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public abstract class TaskProcessor<T extends Task> {
 | 
					public abstract class TaskProcessor<T extends Task, R> {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    protected final Logger log = LoggerFactory.getLogger(getClass());
 | 
					    protected final Logger log = LoggerFactory.getLogger(getClass());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -135,7 +135,7 @@ public abstract class TaskProcessor<T extends Task> {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public abstract void process(T task) throws Exception;
 | 
					    public abstract R process(T task) throws Exception;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void reportSuccess(Task task) {
 | 
					    private void reportSuccess(Task task) {
 | 
				
			||||||
        TaskResult result = TaskResult.builder()
 | 
					        TaskResult result = TaskResult.builder()
 | 
				
			||||||
 | 
				
			|||||||
@ -19,7 +19,9 @@ import com.auth0.jwt.JWT;
 | 
				
			|||||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
					import com.fasterxml.jackson.databind.JsonNode;
 | 
				
			||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
					import com.fasterxml.jackson.databind.node.ObjectNode;
 | 
				
			||||||
import com.google.common.base.Strings;
 | 
					import com.google.common.base.Strings;
 | 
				
			||||||
 | 
					import lombok.SneakyThrows;
 | 
				
			||||||
import org.apache.commons.io.IOUtils;
 | 
					import org.apache.commons.io.IOUtils;
 | 
				
			||||||
 | 
					import org.apache.commons.lang3.concurrent.LazyInitializer;
 | 
				
			||||||
import org.springframework.core.ParameterizedTypeReference;
 | 
					import org.springframework.core.ParameterizedTypeReference;
 | 
				
			||||||
import org.springframework.core.io.ByteArrayResource;
 | 
					import org.springframework.core.io.ByteArrayResource;
 | 
				
			||||||
import org.springframework.core.io.Resource;
 | 
					import org.springframework.core.io.Resource;
 | 
				
			||||||
@ -55,9 +57,9 @@ import org.thingsboard.server.common.data.EntityType;
 | 
				
			|||||||
import org.thingsboard.server.common.data.EntityView;
 | 
					import org.thingsboard.server.common.data.EntityView;
 | 
				
			||||||
import org.thingsboard.server.common.data.EntityViewInfo;
 | 
					import org.thingsboard.server.common.data.EntityViewInfo;
 | 
				
			||||||
import org.thingsboard.server.common.data.EventInfo;
 | 
					import org.thingsboard.server.common.data.EventInfo;
 | 
				
			||||||
import org.thingsboard.server.common.data.ResourceExportData;
 | 
					 | 
				
			||||||
import org.thingsboard.server.common.data.OtaPackage;
 | 
					import org.thingsboard.server.common.data.OtaPackage;
 | 
				
			||||||
import org.thingsboard.server.common.data.OtaPackageInfo;
 | 
					import org.thingsboard.server.common.data.OtaPackageInfo;
 | 
				
			||||||
 | 
					import org.thingsboard.server.common.data.ResourceExportData;
 | 
				
			||||||
import org.thingsboard.server.common.data.ResourceSubType;
 | 
					import org.thingsboard.server.common.data.ResourceSubType;
 | 
				
			||||||
import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
 | 
					import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest;
 | 
				
			||||||
import org.thingsboard.server.common.data.StringUtils;
 | 
					import org.thingsboard.server.common.data.StringUtils;
 | 
				
			||||||
@ -205,7 +207,9 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
    private static final String JWT_TOKEN_HEADER_PARAM = "X-Authorization";
 | 
					    private static final String JWT_TOKEN_HEADER_PARAM = "X-Authorization";
 | 
				
			||||||
    private static final long AVG_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
 | 
					    private static final long AVG_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
 | 
				
			||||||
    protected static final String ACTIVATE_TOKEN_REGEX = "/api/noauth/activate?activateToken=";
 | 
					    protected static final String ACTIVATE_TOKEN_REGEX = "/api/noauth/activate?activateToken=";
 | 
				
			||||||
    private final ExecutorService service = ThingsBoardExecutors.newWorkStealingPool(10, getClass());
 | 
					    private final LazyInitializer<ExecutorService> executor = LazyInitializer.<ExecutorService>builder()
 | 
				
			||||||
 | 
					            .setInitializer(() -> ThingsBoardExecutors.newWorkStealingPool(10, getClass()))
 | 
				
			||||||
 | 
					            .get();
 | 
				
			||||||
    protected final RestTemplate restTemplate;
 | 
					    protected final RestTemplate restTemplate;
 | 
				
			||||||
    protected final RestTemplate loginRestTemplate;
 | 
					    protected final RestTemplate loginRestTemplate;
 | 
				
			||||||
    protected final String baseURL;
 | 
					    protected final String baseURL;
 | 
				
			||||||
@ -223,22 +227,30 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public RestClient(RestTemplate restTemplate, String baseURL) {
 | 
					    public RestClient(RestTemplate restTemplate, String baseURL) {
 | 
				
			||||||
 | 
					        this(restTemplate, baseURL, null);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    public RestClient(RestTemplate restTemplate, String baseURL, String accessToken) {
 | 
				
			||||||
        this.restTemplate = restTemplate;
 | 
					        this.restTemplate = restTemplate;
 | 
				
			||||||
        this.loginRestTemplate = new RestTemplate(restTemplate.getRequestFactory());
 | 
					        this.loginRestTemplate = new RestTemplate(restTemplate.getRequestFactory());
 | 
				
			||||||
        this.baseURL = baseURL;
 | 
					        this.baseURL = baseURL;
 | 
				
			||||||
        this.restTemplate.getInterceptors().add((request, bytes, execution) -> {
 | 
					        this.restTemplate.getInterceptors().add((request, bytes, execution) -> {
 | 
				
			||||||
            HttpRequest wrapper = new HttpRequestWrapper(request);
 | 
					            HttpRequest wrapper = new HttpRequestWrapper(request);
 | 
				
			||||||
            long calculatedTs = System.currentTimeMillis() + clientServerTimeDiff + AVG_REQUEST_TIMEOUT;
 | 
					            if (accessToken == null) {
 | 
				
			||||||
            if (calculatedTs > mainTokenExpTs) {
 | 
					                long calculatedTs = System.currentTimeMillis() + clientServerTimeDiff + AVG_REQUEST_TIMEOUT;
 | 
				
			||||||
                synchronized (RestClient.this) {
 | 
					                if (calculatedTs > mainTokenExpTs) {
 | 
				
			||||||
                    if (calculatedTs > mainTokenExpTs) {
 | 
					                    synchronized (RestClient.this) {
 | 
				
			||||||
                        if (calculatedTs < refreshTokenExpTs) {
 | 
					                        if (calculatedTs > mainTokenExpTs) {
 | 
				
			||||||
                            refreshToken();
 | 
					                            if (calculatedTs < refreshTokenExpTs) {
 | 
				
			||||||
                        } else {
 | 
					                                refreshToken();
 | 
				
			||||||
                            doLogin();
 | 
					                            } else {
 | 
				
			||||||
 | 
					                                doLogin();
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					            } else {
 | 
				
			||||||
 | 
					                mainToken = accessToken;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            wrapper.getHeaders().set(JWT_TOKEN_HEADER_PARAM, "Bearer " + mainToken);
 | 
					            wrapper.getHeaders().set(JWT_TOKEN_HEADER_PARAM, "Bearer " + mainToken);
 | 
				
			||||||
            return execution.execute(wrapper, bytes);
 | 
					            return execution.execute(wrapper, bytes);
 | 
				
			||||||
@ -2403,7 +2415,7 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public Future<List<AttributeKvEntry>> getAttributeKvEntriesAsync(EntityId entityId, List<String> keys) {
 | 
					    public Future<List<AttributeKvEntry>> getAttributeKvEntriesAsync(EntityId entityId, List<String> keys) {
 | 
				
			||||||
        return service.submit(() -> getAttributeKvEntries(entityId, keys));
 | 
					        return getExecutor().submit(() -> getAttributeKvEntries(entityId, keys));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public List<AttributeKvEntry> getAttributesByScope(EntityId entityId, String scope, List<String> keys) {
 | 
					    public List<AttributeKvEntry> getAttributesByScope(EntityId entityId, String scope, List<String> keys) {
 | 
				
			||||||
@ -2976,7 +2988,7 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
        addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params);
 | 
					        addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params);
 | 
				
			||||||
        return restTemplate.exchange(
 | 
					        return restTemplate.exchange(
 | 
				
			||||||
                baseURL + "/api/widgetTypes?" + getUrlParams(pageLink) +
 | 
					                baseURL + "/api/widgetTypes?" + getUrlParams(pageLink) +
 | 
				
			||||||
                        getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList),
 | 
					                getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList),
 | 
				
			||||||
                HttpMethod.GET,
 | 
					                HttpMethod.GET,
 | 
				
			||||||
                HttpEntity.EMPTY,
 | 
					                HttpEntity.EMPTY,
 | 
				
			||||||
                new ParameterizedTypeReference<PageData<WidgetTypeInfo>>() {
 | 
					                new ParameterizedTypeReference<PageData<WidgetTypeInfo>>() {
 | 
				
			||||||
@ -3064,7 +3076,7 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
        addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params);
 | 
					        addWidgetInfoFiltersToParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList, params);
 | 
				
			||||||
        return restTemplate.exchange(
 | 
					        return restTemplate.exchange(
 | 
				
			||||||
                baseURL + "/api/widgetTypesInfos?widgetsBundleId={widgetsBundleId}&" + getUrlParams(pageLink) +
 | 
					                baseURL + "/api/widgetTypesInfos?widgetsBundleId={widgetsBundleId}&" + getUrlParams(pageLink) +
 | 
				
			||||||
                        getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList),
 | 
					                getWidgetTypeInfoPageRequestUrlParams(tenantOnly, fullSearch, deprecatedFilter, widgetTypeList),
 | 
				
			||||||
                HttpMethod.GET,
 | 
					                HttpMethod.GET,
 | 
				
			||||||
                HttpEntity.EMPTY,
 | 
					                HttpEntity.EMPTY,
 | 
				
			||||||
                new ParameterizedTypeReference<PageData<WidgetTypeInfo>>() {
 | 
					                new ParameterizedTypeReference<PageData<WidgetTypeInfo>>() {
 | 
				
			||||||
@ -3765,7 +3777,7 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public PageData<TbResourceInfo> getImages(PageLink pageLink, boolean includeSystemImages) {
 | 
					    public PageData<TbResourceInfo> getImages(PageLink pageLink, boolean includeSystemImages) {
 | 
				
			||||||
       return this.getImages(pageLink, null, includeSystemImages);
 | 
					        return this.getImages(pageLink, null, includeSystemImages);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public PageData<TbResourceInfo> getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) {
 | 
					    public PageData<TbResourceInfo> getImages(PageLink pageLink, ResourceSubType imageSubType, boolean includeSystemImages) {
 | 
				
			||||||
@ -4175,7 +4187,14 @@ public class RestClient implements Closeable {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void close() {
 | 
					    public void close() {
 | 
				
			||||||
        service.shutdown();
 | 
					        if (executor.isInitialized()) {
 | 
				
			||||||
 | 
					            getExecutor().shutdown();
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @SneakyThrows
 | 
				
			||||||
 | 
					    private ExecutorService getExecutor() {
 | 
				
			||||||
 | 
					        return executor.get();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user