cache refactoring
This commit is contained in:
parent
03c3341265
commit
46180e33d7
@ -28,20 +28,24 @@ import java.util.Set;
|
|||||||
|
|
||||||
public interface CalculatedFieldCache {
|
public interface CalculatedFieldCache {
|
||||||
|
|
||||||
CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
CalculatedField getCalculatedField(CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
List<CalculatedField> getCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId);
|
List<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId);
|
||||||
|
|
||||||
List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
List<CalculatedFieldLink> getCalculatedFieldLinks(CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId);
|
List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId);
|
||||||
|
|
||||||
void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
void updateCalculatedFieldLinks(CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService);
|
CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService);
|
||||||
|
|
||||||
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
|
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
|
||||||
|
|
||||||
|
void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
|
void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
void evict(CalculatedFieldId calculatedFieldId);
|
void evict(CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||||
|
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
|
||||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||||
@ -70,98 +71,44 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
|||||||
public void init() {
|
public void init() {
|
||||||
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
|
PageDataIterable<CalculatedField> cfs = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFields, initFetchPackSize);
|
||||||
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
|
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf));
|
||||||
|
calculatedFields.values().forEach(cf ->
|
||||||
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new ArrayList<>()).add(cf)
|
||||||
|
);
|
||||||
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
|
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize);
|
||||||
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
|
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link));
|
||||||
|
calculatedFieldLinks.values().stream()
|
||||||
|
.flatMap(List::stream)
|
||||||
|
.forEach(link ->
|
||||||
|
entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new ArrayList<>()).add(link)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CalculatedField getCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
public CalculatedField getCalculatedField(CalculatedFieldId calculatedFieldId) {
|
||||||
CalculatedField calculatedField = calculatedFields.get(calculatedFieldId);
|
return calculatedFields.get(calculatedFieldId);
|
||||||
if (calculatedField == null) {
|
|
||||||
calculatedFieldFetchLock.lock();
|
|
||||||
try {
|
|
||||||
calculatedField = calculatedFields.get(calculatedFieldId);
|
|
||||||
if (calculatedField == null) {
|
|
||||||
calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId);
|
|
||||||
if (calculatedField != null) {
|
|
||||||
calculatedFields.put(calculatedFieldId, calculatedField);
|
|
||||||
log.debug("[{}] Fetch calculated field into cache: {}", calculatedFieldId, calculatedField);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
calculatedFieldFetchLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.trace("[{}] Found calculated field in cache: {}", calculatedFieldId, calculatedField);
|
|
||||||
return calculatedField;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CalculatedField> getCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId) {
|
public List<CalculatedField> getCalculatedFieldsByEntityId(EntityId entityId) {
|
||||||
List<CalculatedField> cfs = entityIdCalculatedFields.get(entityId);
|
return entityIdCalculatedFields.getOrDefault(entityId, new ArrayList<>());
|
||||||
if (cfs == null) {
|
|
||||||
calculatedFieldFetchLock.lock();
|
|
||||||
try {
|
|
||||||
cfs = entityIdCalculatedFields.get(entityId);
|
|
||||||
if (cfs == null) {
|
|
||||||
cfs = calculatedFieldService.findCalculatedFieldsByEntityId(tenantId, entityId);
|
|
||||||
entityIdCalculatedFields.put(entityId, cfs);
|
|
||||||
log.debug("[{}] Fetch calculated fields by entity into cache: {}", entityId, cfs);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
calculatedFieldFetchLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.trace("[{}] Found calculated fields by entity in cache: {}", entityId, cfs);
|
|
||||||
return cfs;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
public List<CalculatedFieldLink> getCalculatedFieldLinks(CalculatedFieldId calculatedFieldId) {
|
||||||
List<CalculatedFieldLink> cfLinks = calculatedFieldLinks.get(calculatedFieldId);
|
return calculatedFieldLinks.getOrDefault(calculatedFieldId, new ArrayList<>());
|
||||||
if (cfLinks == null) {
|
|
||||||
calculatedFieldFetchLock.lock();
|
|
||||||
try {
|
|
||||||
cfLinks = calculatedFieldLinks.get(calculatedFieldId);
|
|
||||||
if (cfLinks == null) {
|
|
||||||
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksById(tenantId, calculatedFieldId);
|
|
||||||
calculatedFieldLinks.put(calculatedFieldId, cfLinks);
|
|
||||||
log.debug("[{}] Fetch calculated field links into cache: {}", calculatedFieldId, cfLinks);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
calculatedFieldFetchLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.trace("[{}] Found calculated field links in cache: {}", calculatedFieldId, cfLinks);
|
|
||||||
return cfLinks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(TenantId tenantId, EntityId entityId) {
|
public List<CalculatedFieldLink> getCalculatedFieldLinksByEntityId(EntityId entityId) {
|
||||||
List<CalculatedFieldLink> cfLinks = entityIdCalculatedFieldLinks.get(entityId);
|
return entityIdCalculatedFieldLinks.getOrDefault(entityId, new ArrayList<>());
|
||||||
if (cfLinks == null) {
|
|
||||||
calculatedFieldFetchLock.lock();
|
|
||||||
try {
|
|
||||||
cfLinks = entityIdCalculatedFieldLinks.get(entityId);
|
|
||||||
if (cfLinks == null) {
|
|
||||||
cfLinks = calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId);
|
|
||||||
entityIdCalculatedFieldLinks.put(entityId, cfLinks);
|
|
||||||
log.debug("[{}] Fetch calculated field links by entity id into cache: {}", entityId, cfLinks);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
calculatedFieldFetchLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.trace("[{}] Found calculated field links by entity id in cache: {}", entityId, cfLinks);
|
|
||||||
return cfLinks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateCalculatedFieldLinks(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
public void updateCalculatedFieldLinks(CalculatedFieldId calculatedFieldId) {
|
||||||
log.debug("Update calculated field links per entity for calculated field: [{}]", calculatedFieldId);
|
log.debug("Update calculated field links per entity for calculated field: [{}]", calculatedFieldId);
|
||||||
calculatedFieldFetchLock.lock();
|
calculatedFieldFetchLock.lock();
|
||||||
try {
|
try {
|
||||||
List<CalculatedFieldLink> cfLinks = getCalculatedFieldLinks(tenantId, calculatedFieldId);
|
List<CalculatedFieldLink> cfLinks = getCalculatedFieldLinks(calculatedFieldId);
|
||||||
if (cfLinks != null && !cfLinks.isEmpty()) {
|
if (cfLinks != null && !cfLinks.isEmpty()) {
|
||||||
cfLinks.forEach(link -> {
|
cfLinks.forEach(link -> {
|
||||||
entityIdCalculatedFieldLinks.compute(link.getEntityId(), (id, existingList) -> {
|
entityIdCalculatedFieldLinks.compute(link.getEntityId(), (id, existingList) -> {
|
||||||
@ -181,14 +128,14 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CalculatedFieldCtx getCalculatedFieldCtx(TenantId tenantId, CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) {
|
public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId, TbelInvokeService tbelInvokeService) {
|
||||||
CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
calculatedFieldFetchLock.lock();
|
calculatedFieldFetchLock.lock();
|
||||||
try {
|
try {
|
||||||
ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
ctx = calculatedFieldsCtx.get(calculatedFieldId);
|
||||||
if (ctx == null) {
|
if (ctx == null) {
|
||||||
CalculatedField calculatedField = getCalculatedField(tenantId, calculatedFieldId);
|
CalculatedField calculatedField = getCalculatedField(calculatedFieldId);
|
||||||
if (calculatedField != null) {
|
if (calculatedField != null) {
|
||||||
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService);
|
ctx = new CalculatedFieldCtx(calculatedField, tbelInvokeService);
|
||||||
calculatedFieldsCtx.put(calculatedFieldId, ctx);
|
calculatedFieldsCtx.put(calculatedFieldId, ctx);
|
||||||
@ -236,6 +183,42 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
|||||||
return entities;
|
return entities;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||||
|
calculatedFieldFetchLock.lock();
|
||||||
|
try {
|
||||||
|
CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId);
|
||||||
|
EntityId cfEntityId = calculatedField.getEntityId();
|
||||||
|
|
||||||
|
calculatedFields.put(calculatedFieldId, calculatedField);
|
||||||
|
|
||||||
|
entityIdCalculatedFields.computeIfAbsent(cfEntityId, entityId -> new ArrayList<>()).add(calculatedField);
|
||||||
|
|
||||||
|
CalculatedFieldConfiguration configuration = calculatedField.getConfiguration();
|
||||||
|
calculatedFieldLinks.put(calculatedFieldId, configuration.buildCalculatedFieldLinks(tenantId, cfEntityId, calculatedFieldId));
|
||||||
|
|
||||||
|
configuration.getReferencedEntities().stream()
|
||||||
|
.filter(referencedEntityId -> !referencedEntityId.equals(cfEntityId))
|
||||||
|
.forEach(referencedEntityId -> {
|
||||||
|
entityIdCalculatedFieldLinks.computeIfAbsent(referencedEntityId, entityId -> new ArrayList<>())
|
||||||
|
.add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId));
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
calculatedFieldFetchLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||||
|
calculatedFieldFetchLock.lock();
|
||||||
|
try {
|
||||||
|
evict(calculatedFieldId);
|
||||||
|
addCalculatedField(tenantId, calculatedFieldId);
|
||||||
|
} finally {
|
||||||
|
calculatedFieldFetchLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void evict(CalculatedFieldId calculatedFieldId) {
|
public void evict(CalculatedFieldId calculatedFieldId) {
|
||||||
CalculatedField oldCalculatedField = calculatedFields.remove(calculatedFieldId);
|
CalculatedField oldCalculatedField = calculatedFields.remove(calculatedFieldId);
|
||||||
@ -243,7 +226,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
|||||||
calculatedFieldLinks.remove(calculatedFieldId);
|
calculatedFieldLinks.remove(calculatedFieldId);
|
||||||
log.debug("[{}] evict calculated field from cached calculated fields by entity id: {}", calculatedFieldId, oldCalculatedField);
|
log.debug("[{}] evict calculated field from cached calculated fields by entity id: {}", calculatedFieldId, oldCalculatedField);
|
||||||
entityIdCalculatedFields.forEach((entityId, calculatedFields) -> calculatedFields.removeIf(cf -> cf.getId().equals(calculatedFieldId)));
|
entityIdCalculatedFields.forEach((entityId, calculatedFields) -> calculatedFields.removeIf(cf -> cf.getId().equals(calculatedFieldId)));
|
||||||
entityIdCalculatedFields.remove(oldCalculatedField.getEntityId());
|
|
||||||
log.debug("[{}] evict calculated field links from cache: {}", calculatedFieldId, oldCalculatedField);
|
log.debug("[{}] evict calculated field links from cache: {}", calculatedFieldId, oldCalculatedField);
|
||||||
calculatedFieldsCtx.remove(calculatedFieldId);
|
calculatedFieldsCtx.remove(calculatedFieldId);
|
||||||
log.debug("[{}] evict calculated field ctx from cache: {}", calculatedFieldId, oldCalculatedField);
|
log.debug("[{}] evict calculated field ctx from cache: {}", calculatedFieldId, oldCalculatedField);
|
||||||
|
|||||||
@ -92,6 +92,7 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
|||||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -253,7 +254,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
onCalculatedFieldDelete(calculatedFieldId, callback);
|
onCalculatedFieldDelete(calculatedFieldId, callback);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
}
|
}
|
||||||
CalculatedField cf = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId);
|
CalculatedField cf = calculatedFieldCache.getCalculatedField(calculatedFieldId);
|
||||||
if (proto.getUpdated()) {
|
if (proto.getUpdated()) {
|
||||||
log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId);
|
log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId);
|
||||||
boolean shouldReinit = onCalculatedFieldUpdate(cf, callback);
|
boolean shouldReinit = onCalculatedFieldUpdate(cf, callback);
|
||||||
@ -263,7 +264,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
if (cf != null) {
|
if (cf != null) {
|
||||||
EntityId entityId = cf.getEntityId();
|
EntityId entityId = cf.getEntityId();
|
||||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
|
||||||
switch (entityId.getEntityType()) {
|
switch (entityId.getEntityType()) {
|
||||||
case ASSET, DEVICE -> {
|
case ASSET, DEVICE -> {
|
||||||
log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||||
@ -297,7 +298,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
|
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
|
||||||
CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
|
CalculatedField oldCalculatedField = calculatedFieldCache.getCalculatedField(updatedCalculatedField.getId());
|
||||||
boolean shouldReinit = true;
|
boolean shouldReinit = true;
|
||||||
if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) {
|
if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) {
|
||||||
onCalculatedFieldDelete(updatedCalculatedField.getId(), callback);
|
onCalculatedFieldDelete(updatedCalculatedField.getId(), callback);
|
||||||
@ -345,17 +346,27 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
|
|
||||||
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
|
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
|
||||||
TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId();
|
TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId();
|
||||||
Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStatesToUpdate = new HashMap<>();
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
|
||||||
|
|
||||||
updateTelemetryForEntity(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate);
|
if (tpi.isMyPartition()) {
|
||||||
updateTelemetryForProfile(calculatedFieldTelemetryUpdateRequest, getProfileId(tenantId, entityId), tpiStatesToUpdate);
|
|
||||||
updateTelemetryForLinkedEntities(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate);
|
|
||||||
|
|
||||||
if (!tpiStatesToUpdate.isEmpty()) {
|
processCalculatedFields(calculatedFieldTelemetryUpdateRequest, entityId);
|
||||||
tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> {
|
processCalculatedFields(calculatedFieldTelemetryUpdateRequest, getProfileId(tenantId, entityId));
|
||||||
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest, ctxIds);
|
|
||||||
clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder().setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
|
Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStatesToUpdate = new HashMap<>();
|
||||||
});
|
processCalculatedFieldLinks(calculatedFieldTelemetryUpdateRequest, tpiStatesToUpdate);
|
||||||
|
if (!tpiStatesToUpdate.isEmpty()) {
|
||||||
|
tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> {
|
||||||
|
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest, ctxIds);
|
||||||
|
clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
|
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(calculatedFieldTelemetryUpdateRequest);
|
||||||
|
clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
|
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
|
||||||
|
// Forward this request to a correct server based on entity id.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -363,30 +374,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateTelemetryForEntity(CalculatedFieldTelemetryUpdateRequest request, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
|
private void processCalculatedFields(CalculatedFieldTelemetryUpdateRequest request, EntityId cfTargetEntityId) {
|
||||||
updateTelemetryForEntity(request, request.getEntityId(), tpiStates);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateTelemetryForProfile(CalculatedFieldTelemetryUpdateRequest request, EntityId profileId, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
|
|
||||||
updateTelemetryForEntity(request, profileId, tpiStates);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateTelemetryForEntity(CalculatedFieldTelemetryUpdateRequest request, EntityId targetEntity, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
|
|
||||||
TenantId tenantId = request.getTenantId();
|
TenantId tenantId = request.getTenantId();
|
||||||
EntityId entityId = request.getEntityId();
|
EntityId entityId = request.getEntityId();
|
||||||
|
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
|
if (cfTargetEntityId != null) {
|
||||||
if (tpi.isMyPartition()) {
|
calculatedFieldCache.getCalculatedFieldsByEntityId(cfTargetEntityId).forEach(cf -> {
|
||||||
if (targetEntity != null) {
|
CalculatedFieldLinkConfiguration linkConfiguration = cf.getConfiguration().getReferencedEntityConfig(cfTargetEntityId);
|
||||||
calculatedFieldCache.getCalculatedFieldsByEntityId(tenantId, targetEntity).forEach(cf -> {
|
mapAndProcessUpdatedTelemetry(tenantId, entityId, cf.getId(), request, linkConfiguration);
|
||||||
CalculatedFieldLinkConfiguration linkConfiguration = cf.getConfiguration().getReferencedEntityConfig(targetEntity);
|
|
||||||
mapAndProcessUpdatedTelemetry(tenantId, entityId, cf.getId(), request, linkConfiguration);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
List<CalculatedFieldEntityCtxId> ctxIds = tpiStates.computeIfAbsent(tpi, k -> new ArrayList<>());
|
|
||||||
calculatedFieldCache.getCalculatedFieldsByEntityId(tenantId, targetEntity).forEach(cf -> {
|
|
||||||
ctxIds.add(new CalculatedFieldEntityCtxId(cf.getId(), entityId));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -405,14 +400,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateTelemetryForLinkedEntities(CalculatedFieldTelemetryUpdateRequest request, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
|
private void processCalculatedFieldLinks(CalculatedFieldTelemetryUpdateRequest request, Map<TopicPartitionInfo, List<CalculatedFieldEntityCtxId>> tpiStates) {
|
||||||
TenantId tenantId = request.getTenantId();
|
TenantId tenantId = request.getTenantId();
|
||||||
EntityId entityId = request.getEntityId();
|
EntityId entityId = request.getEntityId();
|
||||||
|
|
||||||
calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)
|
calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId)
|
||||||
.forEach(link -> {
|
.forEach(link -> {
|
||||||
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
|
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();
|
||||||
EntityId targetEntityId = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId).getEntityId();
|
EntityId targetEntityId = calculatedFieldCache.getCalculatedField(calculatedFieldId).getEntityId();
|
||||||
|
|
||||||
if (isProfileEntity(targetEntityId)) {
|
if (isProfileEntity(targetEntityId)) {
|
||||||
calculatedFieldCache.getEntitiesByProfile(tenantId, targetEntityId).forEach(entityByProfile -> {
|
calculatedFieldCache.getEntitiesByProfile(tenantId, targetEntityId).forEach(entityByProfile -> {
|
||||||
@ -451,33 +446,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
@Override
|
@Override
|
||||||
public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) {
|
public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) {
|
||||||
try {
|
try {
|
||||||
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
CalculatedFieldTelemetryUpdateRequest request = fromProto(proto);
|
||||||
|
|
||||||
|
if (proto.getLinksList().isEmpty()) {
|
||||||
|
onTelemetryUpdate(request);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
proto.getLinksList().forEach(ctxIdProto -> {
|
proto.getLinksList().forEach(ctxIdProto -> {
|
||||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
|
TenantId tenantId = request.getTenantId();
|
||||||
ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
|
EntityId entityId = request.getEntityId();
|
||||||
|
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
|
||||||
|
|
||||||
List<KvEntry> updatedTelemetry = proto.getUpdatedTelemetryList().stream()
|
CalculatedFieldLinkConfiguration linkConfiguration
|
||||||
.map(ProtoUtils::fromTelemetryProto)
|
= calculatedFieldCache.getCalculatedField(calculatedFieldId).getConfiguration().getReferencedEntityConfig(entityId);
|
||||||
.toList();
|
|
||||||
|
|
||||||
boolean attributesUpdated = StringUtils.isEmpty(proto.getScope());
|
mapAndProcessUpdatedTelemetry(tenantId, entityId, calculatedFieldId, request, linkConfiguration);
|
||||||
|
|
||||||
CalculatedFieldTelemetryUpdateRequest request = attributesUpdated
|
|
||||||
? new CalculatedFieldAttributeUpdateRequest(
|
|
||||||
tenantId, entityId, AttributeScope.valueOf(proto.getScope()), updatedTelemetry,
|
|
||||||
proto.getPreviousCalculatedFieldsList().stream()
|
|
||||||
.map(cfIdProto -> new CalculatedFieldId(
|
|
||||||
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
|
||||||
.toList())
|
|
||||||
: new CalculatedFieldTimeSeriesUpdateRequest(
|
|
||||||
tenantId, entityId, updatedTelemetry,
|
|
||||||
proto.getPreviousCalculatedFieldsList().stream()
|
|
||||||
.map(cfIdProto -> new CalculatedFieldId(
|
|
||||||
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
|
||||||
.toList());
|
|
||||||
|
|
||||||
onTelemetryUpdate(request);
|
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("Failed to process telemetry update msg: [{}]", proto, e);
|
log.trace("Failed to process telemetry update msg: [{}]", proto, e);
|
||||||
@ -486,8 +470,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
|
|
||||||
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> previousCalculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
|
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> previousCalculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
|
||||||
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId);
|
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", tenantId, entityId, calculatedFieldId);
|
||||||
CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(tenantId, calculatedFieldId);
|
CalculatedField calculatedField = calculatedFieldCache.getCalculatedField(calculatedFieldId);
|
||||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
|
||||||
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
|
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
|
||||||
|
|
||||||
@ -524,7 +508,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
Map<String, ArgumentEntry> argumentsMap = proto.getArgumentsMap().entrySet().stream()
|
Map<String, ArgumentEntry> argumentsMap = proto.getArgumentsMap().entrySet().stream()
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue())));
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> fromArgumentEntryProto(entry.getValue())));
|
||||||
|
|
||||||
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(tenantId, calculatedFieldId, tbelInvokeService);
|
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
|
||||||
updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, previousCalculatedFieldIds);
|
updateOrInitializeState(calculatedFieldCtx, entityId, argumentsMap, previousCalculatedFieldIds);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.trace("Failed to process calculated field update state msg: [{}]", proto, e);
|
log.trace("Failed to process calculated field update state msg: [{}]", proto, e);
|
||||||
@ -559,7 +543,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
if (proto.getDeleted()) {
|
if (proto.getDeleted()) {
|
||||||
log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
log.info("Executing profile entity deleted msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||||
|
|
||||||
getCalculatedFieldLinks(tenantId, entityId, profileId)
|
getCalculatedFieldLinks(entityId, profileId)
|
||||||
.forEach(link -> clearState(tenantId, link.getCalculatedFieldId(), entityId));
|
.forEach(link -> clearState(tenantId, link.getCalculatedFieldId(), entityId));
|
||||||
} else {
|
} else {
|
||||||
log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
log.info("Executing profile entity added msg, tenantId=[{}], entityId=[{}]", tenantId, entityId);
|
||||||
@ -585,7 +569,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) {
|
private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) {
|
||||||
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId)
|
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId)
|
||||||
.stream()
|
.stream()
|
||||||
.map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(tenantId, cfId, tbelInvokeService))
|
.map(cfId -> calculatedFieldCache.getCalculatedFieldCtx(cfId, tbelInvokeService))
|
||||||
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
|
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -722,10 +706,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CalculatedFieldLink> getCalculatedFieldLinks(TenantId tenantId, EntityId entityId, EntityId profileId) {
|
private List<CalculatedFieldLink> getCalculatedFieldLinks(EntityId entityId, EntityId profileId) {
|
||||||
List<CalculatedFieldLink> links = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId));
|
List<CalculatedFieldLink> links = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId));
|
||||||
if (profileId != null) {
|
if (profileId != null) {
|
||||||
links.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId));
|
links.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(profileId));
|
||||||
}
|
}
|
||||||
return links;
|
return links;
|
||||||
}
|
}
|
||||||
@ -870,13 +854,22 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TransportProtos.TelemetryUpdateMsgProto buildTelemetryUpdateMsgProto(CalculatedFieldTelemetryUpdateRequest request) {
|
||||||
|
return buildTelemetryUpdateMsgProto(request, Collections.emptyList());
|
||||||
|
}
|
||||||
|
|
||||||
|
;
|
||||||
|
|
||||||
private TransportProtos.TelemetryUpdateMsgProto buildTelemetryUpdateMsgProto(
|
private TransportProtos.TelemetryUpdateMsgProto buildTelemetryUpdateMsgProto(
|
||||||
CalculatedFieldTelemetryUpdateRequest request, List<CalculatedFieldEntityCtxId> links
|
CalculatedFieldTelemetryUpdateRequest request, List<CalculatedFieldEntityCtxId> links
|
||||||
) {
|
) {
|
||||||
TransportProtos.TelemetryUpdateMsgProto.Builder builder = TransportProtos.TelemetryUpdateMsgProto.newBuilder();
|
TransportProtos.TelemetryUpdateMsgProto.Builder builder = TransportProtos.TelemetryUpdateMsgProto.newBuilder();
|
||||||
|
|
||||||
builder.setTenantIdMSB(request.getTenantId().getId().getMostSignificantBits())
|
builder.setTenantIdMSB(request.getTenantId().getId().getMostSignificantBits())
|
||||||
.setTenantIdLSB(request.getTenantId().getId().getLeastSignificantBits());
|
.setTenantIdLSB(request.getTenantId().getId().getLeastSignificantBits())
|
||||||
|
.setEntityType(request.getEntityId().getEntityType().name())
|
||||||
|
.setEntityIdMSB(request.getEntityId().getId().getMostSignificantBits())
|
||||||
|
.setEntityIdLSB(request.getEntityId().getId().getLeastSignificantBits());
|
||||||
|
|
||||||
for (CalculatedFieldEntityCtxId link : links) {
|
for (CalculatedFieldEntityCtxId link : links) {
|
||||||
builder.addLinks(toProto(link));
|
builder.addLinks(toProto(link));
|
||||||
@ -904,6 +897,31 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private CalculatedFieldTelemetryUpdateRequest fromProto(TransportProtos.TelemetryUpdateMsgProto proto) {
|
||||||
|
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
||||||
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||||
|
|
||||||
|
List<KvEntry> updatedTelemetry = proto.getUpdatedTelemetryList().stream()
|
||||||
|
.map(ProtoUtils::fromTelemetryProto)
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
boolean attributesUpdated = StringUtils.isEmpty(proto.getScope());
|
||||||
|
|
||||||
|
return attributesUpdated
|
||||||
|
? new CalculatedFieldAttributeUpdateRequest(
|
||||||
|
tenantId, entityId, AttributeScope.valueOf(proto.getScope()), updatedTelemetry,
|
||||||
|
proto.getPreviousCalculatedFieldsList().stream()
|
||||||
|
.map(cfIdProto -> new CalculatedFieldId(
|
||||||
|
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
||||||
|
.toList())
|
||||||
|
: new CalculatedFieldTimeSeriesUpdateRequest(
|
||||||
|
tenantId, entityId, updatedTelemetry,
|
||||||
|
proto.getPreviousCalculatedFieldsList().stream()
|
||||||
|
.map(cfIdProto -> new CalculatedFieldId(
|
||||||
|
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
||||||
|
.toList());
|
||||||
|
}
|
||||||
|
|
||||||
private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
|
private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
|
||||||
return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder()
|
return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder()
|
||||||
.setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
|
.setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
|
||||||
|
|||||||
@ -194,7 +194,9 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
}
|
}
|
||||||
} else if (EntityType.CALCULATED_FIELD.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.CALCULATED_FIELD.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.CREATED) {
|
if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.CREATED) {
|
||||||
calculatedFieldCache.updateCalculatedFieldLinks(componentLifecycleMsg.getTenantId(), (CalculatedFieldId) componentLifecycleMsg.getEntityId());
|
calculatedFieldCache.addCalculatedField(tenantId, (CalculatedFieldId) componentLifecycleMsg.getEntityId());
|
||||||
|
} else if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.UPDATED) {
|
||||||
|
calculatedFieldCache.updateCalculatedField(tenantId, (CalculatedFieldId) componentLifecycleMsg.getEntityId());
|
||||||
} else {
|
} else {
|
||||||
calculatedFieldCache.evict((CalculatedFieldId) componentLifecycleMsg.getEntityId());
|
calculatedFieldCache.evict((CalculatedFieldId) componentLifecycleMsg.getEntityId());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,9 +22,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
|
||||||
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -90,6 +93,24 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
|
|||||||
return linkConfiguration;
|
return linkConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<CalculatedFieldLink> buildCalculatedFieldLinks(TenantId tenantId, EntityId cfEntityId, CalculatedFieldId calculatedFieldId) {
|
||||||
|
return getReferencedEntities().stream()
|
||||||
|
.filter(referencedEntity -> !referencedEntity.equals(cfEntityId))
|
||||||
|
.map(referencedEntityId -> buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId) {
|
||||||
|
CalculatedFieldLink link = new CalculatedFieldLink();
|
||||||
|
link.setTenantId(tenantId);
|
||||||
|
link.setEntityId(referencedEntityId);
|
||||||
|
link.setCalculatedFieldId(calculatedFieldId);
|
||||||
|
link.setConfiguration(getReferencedEntityConfig(referencedEntityId));
|
||||||
|
return link;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId) {
|
public JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId) {
|
||||||
ObjectNode configNode = mapper.createObjectNode();
|
ObjectNode configNode = mapper.createObjectNode();
|
||||||
|
|||||||
@ -20,9 +20,12 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
|
|||||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLinkConfiguration;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
|
||||||
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -57,4 +60,8 @@ public interface CalculatedFieldConfiguration {
|
|||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId);
|
JsonNode calculatedFieldConfigToJson(EntityType entityType, UUID entityId);
|
||||||
|
|
||||||
|
List<CalculatedFieldLink> buildCalculatedFieldLinks(TenantId tenantId, EntityId cfEntityId, CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
|
CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -824,10 +824,13 @@ message ProfileEntityMsgProto {
|
|||||||
message TelemetryUpdateMsgProto {
|
message TelemetryUpdateMsgProto {
|
||||||
int64 tenantIdMSB = 1;
|
int64 tenantIdMSB = 1;
|
||||||
int64 tenantIdLSB = 2;
|
int64 tenantIdLSB = 2;
|
||||||
repeated CalculatedFieldEntityCtxIdProto links = 3;
|
string entityType = 3;
|
||||||
repeated CalculatedFieldIdProto previousCalculatedFields = 4;
|
int64 entityIdMSB = 4;
|
||||||
string scope = 5;
|
int64 entityIdLSB = 5;
|
||||||
repeated TelemetryProto updatedTelemetry = 6;
|
repeated CalculatedFieldEntityCtxIdProto links = 6;
|
||||||
|
repeated CalculatedFieldIdProto previousCalculatedFields = 7;
|
||||||
|
string scope = 8;
|
||||||
|
repeated TelemetryProto updatedTelemetry = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
message CalculatedFieldEntityCtxIdProto {
|
message CalculatedFieldEntityCtxIdProto {
|
||||||
|
|||||||
@ -38,7 +38,6 @@ import org.thingsboard.server.dao.service.DataValidator;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.thingsboard.server.dao.service.Validator.validateId;
|
import static org.thingsboard.server.dao.service.Validator.validateId;
|
||||||
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
|
import static org.thingsboard.server.dao.service.Validator.validatePageLink;
|
||||||
@ -240,23 +239,8 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void createOrUpdateCalculatedFieldLink(TenantId tenantId, CalculatedField calculatedField) {
|
private void createOrUpdateCalculatedFieldLink(TenantId tenantId, CalculatedField calculatedField) {
|
||||||
List<CalculatedFieldLink> links = buildCalculatedFieldLinks(tenantId, calculatedField);
|
List<CalculatedFieldLink> links = calculatedField.getConfiguration().buildCalculatedFieldLinks(tenantId, calculatedField.getEntityId(), calculatedField.getId());
|
||||||
links.forEach(link -> saveCalculatedFieldLink(tenantId, link));
|
links.forEach(link -> saveCalculatedFieldLink(tenantId, link));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CalculatedFieldLink> buildCalculatedFieldLinks(TenantId tenantId, CalculatedField calculatedField) {
|
|
||||||
CalculatedFieldConfiguration cfConfig = calculatedField.getConfiguration();
|
|
||||||
return cfConfig.getReferencedEntities().stream()
|
|
||||||
.filter(referencedEntity -> !referencedEntity.equals(calculatedField.getEntityId()))
|
|
||||||
.map(referencedEntityId -> {
|
|
||||||
CalculatedFieldLink link = new CalculatedFieldLink();
|
|
||||||
link.setTenantId(tenantId);
|
|
||||||
link.setEntityId(referencedEntityId);
|
|
||||||
link.setCalculatedFieldId(calculatedField.getId());
|
|
||||||
link.setConfiguration(cfConfig.getReferencedEntityConfig(referencedEntityId));
|
|
||||||
return link;
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,6 @@
|
|||||||
package org.thingsboard.server.dao.sql.cf;
|
package org.thingsboard.server.dao.sql.cf;
|
||||||
|
|
||||||
import org.springframework.data.jpa.repository.JpaRepository;
|
import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
|
||||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||||
import org.thingsboard.server.dao.model.sql.CalculatedFieldEntity;
|
import org.thingsboard.server.dao.model.sql.CalculatedFieldEntity;
|
||||||
|
|
||||||
@ -29,7 +28,7 @@ public interface CalculatedFieldRepository extends JpaRepository<CalculatedField
|
|||||||
|
|
||||||
List<CalculatedFieldId> findCalculatedFieldIdsByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
List<CalculatedFieldId> findCalculatedFieldIdsByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
||||||
|
|
||||||
List<CalculatedField> findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
List<CalculatedFieldEntity> findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId);
|
||||||
|
|
||||||
List<CalculatedFieldEntity> findAllByTenantId(UUID tenantId);
|
List<CalculatedFieldEntity> findAllByTenantId(UUID tenantId);
|
||||||
|
|
||||||
|
|||||||
@ -57,7 +57,7 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao<CalculatedFieldEntity,
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<CalculatedField> findCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId) {
|
public List<CalculatedField> findCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId) {
|
||||||
return calculatedFieldRepository.findAllByTenantIdAndEntityId(tenantId.getId(), entityId.getId());
|
return DaoUtil.convertDataList(calculatedFieldRepository.findAllByTenantIdAndEntityId(tenantId.getId(), entityId.getId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user