added logs
This commit is contained in:
		
							parent
							
								
									f75eaf8226
								
							
						
					
					
						commit
						8079ef66ef
					
				@ -26,6 +26,9 @@ public class CalculatedFieldCtx {
 | 
			
		||||
    private EntityId entityId;
 | 
			
		||||
    private CalculatedFieldState state;
 | 
			
		||||
 | 
			
		||||
    public CalculatedFieldCtx() {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public CalculatedFieldCtx(CalculatedFieldId calculatedFieldId, EntityId entityId, CalculatedFieldState state) {
 | 
			
		||||
        this.calculatedFieldId = calculatedFieldId;
 | 
			
		||||
        this.entityId = entityId;
 | 
			
		||||
 | 
			
		||||
@ -24,6 +24,7 @@ import java.util.Map;
 | 
			
		||||
@Builder
 | 
			
		||||
public class CalculatedFieldState {
 | 
			
		||||
 | 
			
		||||
    // TODO: use value object(TsKv) instead of string
 | 
			
		||||
    Map<String, String> arguments;
 | 
			
		||||
    String result;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -67,7 +67,6 @@ import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Objects;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Random;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.ConcurrentMap;
 | 
			
		||||
@ -113,7 +112,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
                Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field"));
 | 
			
		||||
        calculatedFieldCallbackExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
 | 
			
		||||
                Math.max(4, Runtime.getRuntime().availableProcessors()), "calculated-field-callback"));
 | 
			
		||||
        scheduledExecutor.scheduleWithFixedDelay(this::fetchCalculatedFields, new Random().nextInt(defaultCalculatedFieldCheckIntervalInSec), defaultCalculatedFieldCheckIntervalInSec, TimeUnit.SECONDS);
 | 
			
		||||
        scheduledExecutor.scheduleWithFixedDelay(this::fetchCalculatedFields, 0, defaultCalculatedFieldCheckIntervalInSec, TimeUnit.SECONDS);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @PreDestroy
 | 
			
		||||
@ -136,11 +135,15 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
        try {
 | 
			
		||||
            TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
 | 
			
		||||
            CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(proto.getCalculatedFieldIdMSB(), proto.getCalculatedFieldIdLSB()));
 | 
			
		||||
            log.info("Received CalculatedFieldMsgProto for processing: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId);
 | 
			
		||||
            if (proto.getDeleted()) {
 | 
			
		||||
                log.warn("Executing onCalculatedFieldDelete, calculatedFieldId=[{}]", calculatedFieldId);
 | 
			
		||||
                onCalculatedFieldDelete(calculatedFieldId, callback);
 | 
			
		||||
                callback.onSuccess();
 | 
			
		||||
            }
 | 
			
		||||
            if (proto.getUpdated()) {
 | 
			
		||||
                log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId);
 | 
			
		||||
                //TODO: improve the check. Maybe it was renamed or just the name of the result changed.
 | 
			
		||||
                onCalculatedFieldDelete(calculatedFieldId, callback);
 | 
			
		||||
            }
 | 
			
		||||
            CalculatedField cf = calculatedFieldService.findById(tenantId, calculatedFieldId);
 | 
			
		||||
@ -150,13 +153,18 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
                calculatedFields.put(calculatedFieldId, cf);
 | 
			
		||||
                calculatedFieldLinks.put(calculatedFieldId, links);
 | 
			
		||||
                switch (entityId.getEntityType()) {
 | 
			
		||||
                    case ASSET, DEVICE -> initializeStateForEntity(tenantId, cf, entityId, callback);
 | 
			
		||||
                    case ASSET, DEVICE -> {
 | 
			
		||||
                        log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId);
 | 
			
		||||
                        initializeStateForEntity(tenantId, cf, entityId, callback);
 | 
			
		||||
                    }
 | 
			
		||||
                    case ASSET_PROFILE -> {
 | 
			
		||||
                        log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId);
 | 
			
		||||
                        PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink ->
 | 
			
		||||
                                assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize);
 | 
			
		||||
                        assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback));
 | 
			
		||||
                    }
 | 
			
		||||
                    case DEVICE_PROFILE -> {
 | 
			
		||||
                        log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId);
 | 
			
		||||
                        PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink ->
 | 
			
		||||
                                deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize);
 | 
			
		||||
                        deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback));
 | 
			
		||||
@ -164,12 +172,14 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
                    default ->
 | 
			
		||||
                            throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields.");
 | 
			
		||||
                }
 | 
			
		||||
                log.info("Successfully processed calculated field message for calculatedFieldId: [{}]", calculatedFieldId);
 | 
			
		||||
            } else {
 | 
			
		||||
                //Calculated field or entity was probably deleted while message was in queue;
 | 
			
		||||
                //Calculated field was probably deleted while message was in queue;
 | 
			
		||||
                log.warn("Calculated field not found, possibly deleted: {}", calculatedFieldId);
 | 
			
		||||
                callback.onSuccess();
 | 
			
		||||
            }
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.trace("Failed to process calculated field add msg: [{}]", proto, e);
 | 
			
		||||
            log.trace("Failed to process calculated field msg: [{}]", proto, e);
 | 
			
		||||
            callback.onFailure(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -210,7 +220,6 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private void onCalculatedFieldDelete(CalculatedFieldId calculatedFieldId, TbCallback callback) {
 | 
			
		||||
        try {
 | 
			
		||||
            calculatedFieldLinks.remove(calculatedFieldId);
 | 
			
		||||
@ -221,7 +230,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
                    .collect(Collectors.toList());
 | 
			
		||||
            rocksDBService.deleteAll(statesToRemove);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            log.trace("Failed to delete calculated field.", e);
 | 
			
		||||
            log.trace("Failed to delete calculated field: [{}]", calculatedFieldId, e);
 | 
			
		||||
            callback.onFailure(e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
@ -231,7 +240,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
        cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
 | 
			
		||||
        PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
 | 
			
		||||
        cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
 | 
			
		||||
        rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(ctxId, JacksonUtil.convertValue(ctx, CalculatedFieldCtx.class)));
 | 
			
		||||
        rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(ctxId, JacksonUtil.fromString(ctx, CalculatedFieldCtx.class)));
 | 
			
		||||
        states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.startsWith(id.toString())));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -276,7 +285,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
 | 
			
		||||
            @Override
 | 
			
		||||
            public void onFailure(Throwable t) {
 | 
			
		||||
                log.warn("Failed to fetch data for type: {}", argument.getType(), t);
 | 
			
		||||
                log.warn("Failed to initialize state for entity: [{}]", entityId, t);
 | 
			
		||||
                callback.onFailure(t);
 | 
			
		||||
            }
 | 
			
		||||
        }, calculatedFieldCallbackExecutor));
 | 
			
		||||
@ -327,7 +336,7 @@ public class DefaultTbCalculatedFieldService extends AbstractTbEntityService imp
 | 
			
		||||
        }
 | 
			
		||||
        calculatedFieldCtx.setState(state);
 | 
			
		||||
        states.put(ctxId, calculatedFieldCtx);
 | 
			
		||||
        rocksDBService.put(ctxId, Objects.requireNonNull(JacksonUtil.toString(calculatedFieldCtx)));
 | 
			
		||||
        rocksDBService.put(ctxId, Objects.requireNonNull(JacksonUtil.writeValueAsString(calculatedFieldCtx)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private String performCalculation(Map<String, String> argumentValues, CalculatedFieldConfiguration calculatedFieldConfiguration) {
 | 
			
		||||
 | 
			
		||||
@ -41,7 +41,6 @@ public interface CalculatedFieldConfiguration {
 | 
			
		||||
 | 
			
		||||
    Map<String, BaseCalculatedFieldConfiguration.Argument> getArguments();
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    BaseCalculatedFieldConfiguration.Output getOutput();
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
 | 
			
		||||
@ -56,7 +56,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
 | 
			
		||||
    private UUID tenantId;
 | 
			
		||||
 | 
			
		||||
    @Column(name = CALCULATED_FIELD_ENTITY_TYPE)
 | 
			
		||||
    private EntityType entityType;
 | 
			
		||||
    private String entityType;
 | 
			
		||||
 | 
			
		||||
    @Column(name = CALCULATED_FIELD_ENTITY_ID)
 | 
			
		||||
    private UUID entityId;
 | 
			
		||||
@ -88,12 +88,12 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
 | 
			
		||||
        this.setUuid(calculatedField.getUuidId());
 | 
			
		||||
        this.createdTime = calculatedField.getCreatedTime();
 | 
			
		||||
        this.tenantId = calculatedField.getTenantId().getId();
 | 
			
		||||
        this.entityType = calculatedField.getEntityId().getEntityType();
 | 
			
		||||
        this.entityType = calculatedField.getEntityId().getEntityType().name();
 | 
			
		||||
        this.entityId = calculatedField.getEntityId().getId();
 | 
			
		||||
        this.type = calculatedField.getType();
 | 
			
		||||
        this.name = calculatedField.getName();
 | 
			
		||||
        this.configurationVersion = calculatedField.getConfigurationVersion();
 | 
			
		||||
        this.configuration = calculatedField.getConfiguration().calculatedFieldConfigToJson(entityType, entityId);
 | 
			
		||||
        this.configuration = calculatedField.getConfiguration().calculatedFieldConfigToJson(EntityType.valueOf(entityType), entityId);
 | 
			
		||||
        this.version = calculatedField.getVersion();
 | 
			
		||||
        if (calculatedField.getExternalId() != null) {
 | 
			
		||||
            this.externalId = calculatedField.getExternalId().getId();
 | 
			
		||||
@ -109,7 +109,7 @@ public class CalculatedFieldEntity extends BaseSqlEntity<CalculatedField> implem
 | 
			
		||||
        calculatedField.setType(type);
 | 
			
		||||
        calculatedField.setName(name);
 | 
			
		||||
        calculatedField.setConfigurationVersion(configurationVersion);
 | 
			
		||||
        calculatedField.setConfiguration(readCalculatedFieldConfiguration(configuration, entityType, entityId));
 | 
			
		||||
        calculatedField.setConfiguration(readCalculatedFieldConfiguration(configuration, EntityType.valueOf(entityType), entityId));
 | 
			
		||||
        calculatedField.setVersion(version);
 | 
			
		||||
        if (externalId != null) {
 | 
			
		||||
            calculatedField.setExternalId(new CalculatedFieldId(externalId));
 | 
			
		||||
 | 
			
		||||
@ -53,7 +53,7 @@ public class CalculatedFieldLinkEntity extends BaseSqlEntity<CalculatedFieldLink
 | 
			
		||||
    private UUID tenantId;
 | 
			
		||||
 | 
			
		||||
    @Column(name = CALCULATED_FIELD_LINK_ENTITY_TYPE)
 | 
			
		||||
    private EntityType entityType;
 | 
			
		||||
    private String entityType;
 | 
			
		||||
 | 
			
		||||
    @Column(name = CALCULATED_FIELD_LINK_ENTITY_ID)
 | 
			
		||||
    private UUID entityId;
 | 
			
		||||
@ -73,7 +73,7 @@ public class CalculatedFieldLinkEntity extends BaseSqlEntity<CalculatedFieldLink
 | 
			
		||||
        this.setUuid(calculatedFieldLink.getUuidId());
 | 
			
		||||
        this.createdTime = calculatedFieldLink.getCreatedTime();
 | 
			
		||||
        this.tenantId = calculatedFieldLink.getTenantId().getId();
 | 
			
		||||
        this.entityType = calculatedFieldLink.getEntityId().getEntityType();
 | 
			
		||||
        this.entityType = calculatedFieldLink.getEntityId().getEntityType().name();
 | 
			
		||||
        this.entityId = calculatedFieldLink.getEntityId().getId();
 | 
			
		||||
        this.calculatedFieldId = calculatedFieldLink.getCalculatedFieldId().getId();
 | 
			
		||||
        this.configuration = JacksonUtil.valueToTree(calculatedFieldLink.getConfiguration());
 | 
			
		||||
 | 
			
		||||
@ -76,7 +76,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
 | 
			
		||||
                String type = (String) row.get("type");
 | 
			
		||||
                String name = (String) row.get("name");
 | 
			
		||||
                int configurationVersion = (int) row.get("configuration_version");
 | 
			
		||||
                JsonNode configuration = JacksonUtil.valueToTree(row.get("configuration"));
 | 
			
		||||
                JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration"));
 | 
			
		||||
                long version = (long) row.get("version");
 | 
			
		||||
                Object externalIdObj = row.get("external_id");
 | 
			
		||||
 | 
			
		||||
@ -117,7 +117,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
 | 
			
		||||
                EntityType entityType = EntityType.valueOf((String) row.get("entity_type"));
 | 
			
		||||
                UUID entityId = (UUID) row.get("entity_id");
 | 
			
		||||
                UUID calculatedFieldId = (UUID) row.get("calculated_field_id");
 | 
			
		||||
                JsonNode configuration = JacksonUtil.valueToTree(row.get("configuration"));
 | 
			
		||||
                JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration"));
 | 
			
		||||
 | 
			
		||||
                CalculatedFieldLink calculatedFieldLink = new CalculatedFieldLink();
 | 
			
		||||
                calculatedFieldLink.setId(new CalculatedFieldLinkId(id));
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user