diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index ab1d2bbc61..5fc4e6dab2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -17,12 +17,18 @@ package org.thingsboard.server.service.install; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Profile; import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; @@ -51,6 +57,7 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -78,12 +85,19 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.settings.AdminSettingsService; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetsBundleService; +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @Service @Profile("install") @@ -93,6 +107,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { private static final ObjectMapper objectMapper = new ObjectMapper(); public static final String CUSTOMER_CRED = "customer"; public static final String DEFAULT_DEVICE_TYPE = "default"; + public static final String ACTIVITY_STATE = "active"; @Autowired private InstallScripts installScripts; @@ -133,11 +148,32 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { @Autowired private RuleChainService ruleChainService; + @Autowired + private TimeseriesService tsService; + + @Value("${state.persistToTelemetry:false}") + @Getter + private boolean persistActivityToTelemetry; + @Bean protected BCryptPasswordEncoder passwordEncoder() { return new BCryptPasswordEncoder(); } + private ExecutorService tsCallBackExecutor; + + @PostConstruct + public void initExecutor() { + tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("sys-loader-ts-callback")); + } + + @PreDestroy + public void shutdownExecutor() { + if (tsCallBackExecutor != null) { + tsCallBackExecutor.shutdownNow(); + } + } + @Override public void createSysAdmin() { createUser(Authority.SYS_ADMIN, null, null, "sysadmin@thingsboard.org", "sysadmin"); @@ -481,11 +517,62 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { device.setAdditionalInfo(additionalInfo); } device = deviceService.saveDevice(device); - //TODO: No access to cluster service, so we should manually update the status of device. + save(device.getId(), ACTIVITY_STATE, false); DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(TenantId.SYS_TENANT_ID, device.getId()); deviceCredentials.setCredentialsId(accessToken); deviceCredentialsService.updateDeviceCredentials(TenantId.SYS_TENANT_ID, deviceCredentials); return device; } + private void save(DeviceId deviceId, String key, boolean value) { + if (persistActivityToTelemetry) { + ListenableFuture saveFuture = tsService.save( + TenantId.SYS_TENANT_ID, + deviceId, + Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); + addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); + } else { + ListenableFuture> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, + Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) + , System.currentTimeMillis()))); + addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); + } + } + + private static class TelemetrySaveCallback implements FutureCallback { + private final DeviceId deviceId; + private final String key; + private final Object value; + + TelemetrySaveCallback(DeviceId deviceId, String key, Object value) { + this.deviceId = deviceId; + this.key = key; + this.value = value; + } + + @Override + public void onSuccess(@Nullable T result) { + log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value); + } + + @Override + public void onFailure(Throwable t) { + log.warn("[{}] Failed to update attribute [{}] with value [{}]", deviceId, key, value, t); + } + } + + private void addTsCallback(ListenableFuture saveFuture, final FutureCallback callback) { + Futures.addCallback(saveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable S result) { + callback.onSuccess(result); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }, tsCallBackExecutor); + } + }