Merge pull request #12526 from irynamatveieva/calculated-fields
Calculated Fields
This commit is contained in:
commit
1e39bf9eb5
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpStatus;
|
||||
@ -24,17 +25,26 @@ import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.ResponseBody;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.config.annotations.ApiOperation;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService;
|
||||
import org.thingsboard.server.service.security.permission.Operation;
|
||||
|
||||
import static org.thingsboard.server.controller.ControllerConstants.CF_TEXT_SEARCH_DESCRIPTION;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH;
|
||||
import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK;
|
||||
|
||||
@ -81,6 +91,26 @@ public class CalculatedFieldController extends BaseController {
|
||||
return calculatedField;
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Get Calculated Fields (getCalculatedFields)",
|
||||
notes = "Returns a page of calculated fields. " + PAGE_DATA_PARAMETERS
|
||||
)
|
||||
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
|
||||
@RequestMapping(value = "/calculatedFields", params = {"pageSize", "page"}, method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public PageData<CalculatedField> getCalculatedFields(
|
||||
@Parameter(description = PAGE_SIZE_DESCRIPTION, required = true)
|
||||
@RequestParam int pageSize,
|
||||
@Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true)
|
||||
@RequestParam int page,
|
||||
@Parameter(description = CF_TEXT_SEARCH_DESCRIPTION)
|
||||
@RequestParam(required = false) String textSearch,
|
||||
@Parameter(description = SORT_PROPERTY_DESCRIPTION, schema = @Schema(allowableValues = {"createdTime", "name"}))
|
||||
@RequestParam(required = false) String sortProperty,
|
||||
@Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"}))
|
||||
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
|
||||
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
|
||||
return checkNotNull(calculatedFieldService.findAllCalculatedFields(pageLink));
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Delete Calculated Field (deleteCalculatedField)",
|
||||
notes = "Deletes the calculated field. Referencing non-existing Calculated Field Id will cause an error." + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
|
||||
|
||||
@ -96,6 +96,7 @@ public class ControllerConstants {
|
||||
protected static final String EDGE_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the edge name.";
|
||||
protected static final String EVENT_TEXT_SEARCH_DESCRIPTION = "The value is not used in searching.";
|
||||
protected static final String AUDIT_LOG_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on one of the next properties: entityType, entityName, userName, actionType, actionStatus.";
|
||||
protected static final String CF_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the calculated field name.";
|
||||
protected static final String SORT_PROPERTY_DESCRIPTION = "Property of entity to sort by";
|
||||
|
||||
protected static final String SORT_ORDER_DESCRIPTION = "Sort order. ASC (ASCENDING) or DESC (DESCENDING)";
|
||||
|
||||
@ -41,10 +41,6 @@ public interface CalculatedFieldCache {
|
||||
|
||||
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void evictProfile(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void evictEntity(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
@ -19,6 +19,7 @@ import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -48,6 +49,8 @@ public interface CalculatedFieldExecutionService {
|
||||
|
||||
void pushStateToStorage(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state, TbCallback callback);
|
||||
|
||||
void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto);
|
||||
|
||||
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
|
||||
|
||||
// void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
|
||||
|
||||
@ -22,22 +22,15 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
@ -179,33 +172,6 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
return entities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictProfile(TenantId tenantId, EntityId entityId) {
|
||||
log.debug("[{}] evict entity profile from cache.", entityId);
|
||||
profileEntities.remove(entityId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictEntity(TenantId tenantId, EntityId entityId) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
profileEntities.forEach((profile, entityIds) -> entityIds.remove(entityId));
|
||||
if (EntityType.ASSET.equals(entityId.getEntityType())) {
|
||||
Asset asset = assetService.findAssetById(tenantId, (AssetId) entityId);
|
||||
if (asset != null) {
|
||||
profileEntities.computeIfAbsent(asset.getAssetProfileId(), profileId -> new HashSet<>()).add(entityId);
|
||||
}
|
||||
} else {
|
||||
Device device = deviceService.findDeviceById(tenantId, (DeviceId) entityId);
|
||||
if (device != null) {
|
||||
profileEntities.computeIfAbsent(device.getDeviceProfileId(), profileId -> new HashSet<>()).add(entityId);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
|
||||
@ -118,6 +118,8 @@ import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
|
||||
import static org.thingsboard.server.common.util.ProtoUtils.toTsKvProto;
|
||||
import static org.thingsboard.server.queue.discovery.HashPartitionService.CALCULATED_FIELD_QUEUE_KEY;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@ -325,6 +327,26 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
states.keySet().removeIf(ctxId -> ctxId.cfId().equals(calculatedFieldId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushCalculatedFieldLifecycleMsgToQueue(CalculatedField calculatedField, ComponentLifecycleMsgProto proto) {
|
||||
EntityId entityId = calculatedField.getEntityId();
|
||||
ToCalculatedFieldMsg msg = ToCalculatedFieldMsg.newBuilder().setComponentLifecycleMsg(proto).build();
|
||||
switch (entityId.getEntityType()) {
|
||||
case ASSET, DEVICE -> {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
|
||||
clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null);
|
||||
}
|
||||
case ASSET_PROFILE, DEVICE_PROFILE -> {
|
||||
Set<TopicPartitionInfo> tpiSet = calculatedFieldCache.getEntitiesByProfile(calculatedField.getTenantId(), entityId).stream()
|
||||
.map(targetEntityId -> partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, targetEntityId))
|
||||
.collect(Collectors.toSet());
|
||||
tpiSet.forEach(tpi -> clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, null));
|
||||
}
|
||||
default -> throw new IllegalArgumentException("Entity type '" + calculatedField.getId().getEntityType()
|
||||
+ "' does not support calculated fields.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
@ -505,7 +527,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
CalculatedFieldTelemetryUpdateRequest request = fromProto(proto.getMsg());
|
||||
|
||||
if (proto.getLinksList().isEmpty()) {
|
||||
onTelemetryUpdate(proto, callback);
|
||||
onTelemetryUpdate(proto.getMsg(), callback);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -803,7 +825,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
List<Long> versions = result.getVersions();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
long tsVersion = versions.get(i);
|
||||
TsKvProto tsProto = ProtoUtils.toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
|
||||
TsKvProto tsProto = toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
|
||||
telemetryMsg.addTsData(tsProto);
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
@ -837,7 +859,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||
telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
|
||||
if(calculatedFieldIds != null) {
|
||||
if (calculatedFieldIds != null) {
|
||||
for (CalculatedFieldId cfId : calculatedFieldIds) {
|
||||
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
@ -87,7 +88,7 @@ public class EntityStateSourcingListener {
|
||||
case ASSET -> {
|
||||
onAssetUpdate(event.getEntity(), event.getOldEntity());
|
||||
}
|
||||
case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE, CALCULATED_FIELD -> {
|
||||
case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> {
|
||||
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent);
|
||||
}
|
||||
case RULE_CHAIN -> {
|
||||
@ -122,6 +123,9 @@ public class EntityStateSourcingListener {
|
||||
ApiUsageState apiUsageState = (ApiUsageState) event.getEntity();
|
||||
tbClusterService.onApiStateChange(apiUsageState, null);
|
||||
}
|
||||
case CALCULATED_FIELD -> {
|
||||
onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity(), lifecycleEvent);
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
@ -146,7 +150,7 @@ public class EntityStateSourcingListener {
|
||||
Asset asset = (Asset) event.getEntity();
|
||||
tbClusterService.onAssetDeleted(tenantId, asset, null);
|
||||
}
|
||||
case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE, CALCULATED_FIELD -> {
|
||||
case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> {
|
||||
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED);
|
||||
}
|
||||
case NOTIFICATION_REQUEST -> {
|
||||
@ -187,6 +191,11 @@ public class EntityStateSourcingListener {
|
||||
TbResourceInfo tbResource = (TbResourceInfo) event.getEntity();
|
||||
tbClusterService.onResourceDeleted(tbResource, null);
|
||||
}
|
||||
case CALCULATED_FIELD -> {
|
||||
CalculatedField calculatedField = (CalculatedField) event.getEntity();
|
||||
ComponentLifecycleMsg lifecycleMsg = new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), ComponentLifecycleEvent.DELETED);
|
||||
tbClusterService.onCalculatedFieldDeleted(calculatedField.getTenantId(), calculatedField, lifecycleMsg);
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
@ -267,6 +276,15 @@ public class EntityStateSourcingListener {
|
||||
}
|
||||
}
|
||||
|
||||
private void onCalculatedFieldUpdate(Object entity, Object oldEntity, ComponentLifecycleEvent lifecycleEvent) {
|
||||
CalculatedField calculatedField = (CalculatedField) entity;
|
||||
CalculatedField oldCalculatedField = null;
|
||||
if (oldEntity instanceof CalculatedField) {
|
||||
oldCalculatedField = (CalculatedField) oldEntity;
|
||||
}
|
||||
tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField, new ComponentLifecycleMsg(calculatedField.getTenantId(), calculatedField.getId(), lifecycleEvent));
|
||||
}
|
||||
|
||||
private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
|
||||
String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice));
|
||||
if (data != null) {
|
||||
|
||||
@ -36,10 +36,12 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.queue.QueueConfig;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
@ -163,8 +165,15 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg());
|
||||
forwardToActorSystem(toCfMsg.getTelemetryMsg(), callback);
|
||||
} else if (toCfMsg.hasLinkedTelemetryMsg()) {
|
||||
log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg());
|
||||
forwardToActorSystem(toCfMsg.getLinkedTelemetryMsg(), callback);
|
||||
} else if (toCfMsg.hasComponentLifecycleMsg()) {
|
||||
log.trace("[{}] Forwarding component lifecycle message for processing {}", id, toCfMsg.getComponentLifecycleMsg());
|
||||
/// TODO: forward to Actor system
|
||||
forwardToCalculatedFieldService(toCfMsg.getComponentLifecycleMsg(), callback);
|
||||
} else if (toCfMsg.hasEntityUpdateMsg()) {
|
||||
log.trace("[{}] Forwarding entity update message for processing {}", id, toCfMsg.getEntityUpdateMsg());
|
||||
/// TODO: forward to Actor system
|
||||
forwardToCalculatedFieldService(toCfMsg.getEntityUpdateMsg(), callback);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}] Failed to process message: {}", id, msg, e);
|
||||
@ -214,36 +223,17 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
protected void handleNotification(UUID id, TbProtoQueueMsg<ToCalculatedFieldNotificationMsg> msg, TbCallback callback) {
|
||||
ToCalculatedFieldNotificationMsg toCfNotification = msg.getValue();
|
||||
if (toCfNotification.hasComponentLifecycle()) {
|
||||
// from upstream (maybe removed since we dont need to init state for each partition)
|
||||
forwardToActorSystem(toCfNotification.getComponentLifecycle(), callback);
|
||||
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCfNotification.getComponentLifecycle()));
|
||||
} else if (toCfNotification.hasEntityUpdateMsg()) {
|
||||
processEntityUpdateMsg(toCfNotification.getEntityUpdateMsg());
|
||||
// from upstream (maybe removed since we dont need to update state for each partition)
|
||||
forwardToActorSystem(toCfNotification.getEntityUpdateMsg(), callback);
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
// private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) {
|
||||
// var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB());
|
||||
// var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
|
||||
// var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB()));
|
||||
// var newProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getNewProfileIdMSB(), profileUpdateMsg.getNewProfileIdLSB()));
|
||||
// calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId);
|
||||
// calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId);
|
||||
// }
|
||||
//
|
||||
// private void processProfileEntityMsg(TransportProtos.ProfileEntityMsgProto profileEntityMsg) {
|
||||
// var tenantId = toTenantId(profileEntityMsg.getTenantIdMSB(), profileEntityMsg.getTenantIdLSB());
|
||||
// var entityId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityType(), new UUID(profileEntityMsg.getEntityIdMSB(), profileEntityMsg.getEntityIdLSB()));
|
||||
// var profileId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityProfileType(), new UUID(profileEntityMsg.getProfileIdMSB(), profileEntityMsg.getProfileIdLSB()));
|
||||
// boolean added = profileEntityMsg.getAdded();
|
||||
// Set<EntityId> entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId);
|
||||
// if (added) {
|
||||
// entitiesByProfile.add(entityId);
|
||||
// } else {
|
||||
// entitiesByProfile.remove(entityId);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
|
||||
private void forwardToActorSystem(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
|
||||
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
|
||||
@ -257,7 +247,7 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
actorContext.tell(new CalculatedFieldLinkedTelemetryMsg(tenantId, entityId, linkedMsg, callback));
|
||||
}
|
||||
|
||||
private void forwardToActorSystem(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) {
|
||||
private void forwardToCalculatedFieldService(ComponentLifecycleMsgProto msg, TbCallback callback) {
|
||||
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback));
|
||||
@ -269,7 +259,19 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
});
|
||||
}
|
||||
|
||||
private void forwardToActorSystem(TransportProtos.CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) {
|
||||
private void forwardToActorSystem(ComponentLifecycleMsgProto msg, TbCallback callback) {
|
||||
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onCalculatedFieldLifecycleMsg(msg, callback));
|
||||
DonAsynchron.withCallback(future,
|
||||
__ -> callback.onSuccess(),
|
||||
t -> {
|
||||
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
|
||||
callback.onFailure(t);
|
||||
});
|
||||
}
|
||||
|
||||
private void forwardToCalculatedFieldService(CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) {
|
||||
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback));
|
||||
@ -281,6 +283,35 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
|
||||
});
|
||||
}
|
||||
|
||||
private void forwardToActorSystem(CalculatedFieldEntityUpdateMsgProto msg, TbCallback callback) {
|
||||
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(msg.getEntityType(), new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
|
||||
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityUpdateMsg(msg, callback));
|
||||
DonAsynchron.withCallback(future,
|
||||
__ -> callback.onSuccess(),
|
||||
t -> {
|
||||
log.warn("[{}] Failed to process entity updated message for entity [{}]", tenantId.getId(), entityId.getId(), t);
|
||||
callback.onFailure(t);
|
||||
});
|
||||
}
|
||||
|
||||
private void processEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto entityUpdateMsg) {
|
||||
var tenantId = toTenantId(entityUpdateMsg.getTenantIdMSB(), entityUpdateMsg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityType(), new UUID(entityUpdateMsg.getEntityIdMSB(), entityUpdateMsg.getEntityIdLSB()));
|
||||
if (entityUpdateMsg.getAdded()) {
|
||||
var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB()));
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId);
|
||||
} else if (entityUpdateMsg.getDeleted()) {
|
||||
var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB()));
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId);
|
||||
} else if (entityUpdateMsg.getUpdated()) {
|
||||
var oldProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getOldProfileIdMSB(), entityUpdateMsg.getOldProfileIdLSB()));
|
||||
var newProfile = EntityIdFactory.getByTypeAndUuid(entityUpdateMsg.getEntityProfileType(), new UUID(entityUpdateMsg.getNewProfileIdMSB(), entityUpdateMsg.getNewProfileIdLSB()));
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId);
|
||||
}
|
||||
}
|
||||
|
||||
private void throwNotHandled(Object msg, TbCallback callback) {
|
||||
log.warn("Message not handled: {}", msg);
|
||||
callback.onFailure(new RuntimeException("Message not handled!"));
|
||||
|
||||
@ -38,10 +38,12 @@ import org.thingsboard.server.common.data.TbResourceInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -68,6 +70,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
||||
import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.edge.EdgeService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto;
|
||||
@ -95,6 +98,7 @@ import org.thingsboard.server.queue.common.TbRuleEngineProducerService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
|
||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
|
||||
import org.thingsboard.server.service.ota.OtaPackageStateService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
@ -145,6 +149,10 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
@Lazy
|
||||
private OtaPackageStateService otaPackageStateService;
|
||||
|
||||
@Autowired
|
||||
@Lazy
|
||||
private CalculatedFieldExecutionService calculatedFieldExecutionService;
|
||||
|
||||
private final TopicService topicService;
|
||||
private final TbDeviceProfileCache deviceProfileCache;
|
||||
private final TbAssetProfileCache assetProfileCache;
|
||||
@ -342,21 +350,21 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
|
||||
producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
|
||||
toCoreMsgs.incrementAndGet();
|
||||
toRuleEngineMsgs.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
|
||||
log.trace("PUSHING msg: {} to:{}", msg, tpi);
|
||||
producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
|
||||
toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS
|
||||
toRuleEngineMsgs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
|
||||
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
|
||||
toCoreMsgs.incrementAndGet();
|
||||
toRuleEngineNfs.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -686,6 +694,20 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, ComponentLifecycleMsg lifecycleMsg) {
|
||||
var created = oldCalculatedField == null;
|
||||
calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg));
|
||||
broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, ComponentLifecycleMsg lifecycleMsg) {
|
||||
CalculatedFieldId calculatedFieldId = calculatedField.getId();
|
||||
calculatedFieldExecutionService.pushCalculatedFieldLifecycleMsgToQueue(calculatedField, toProto(lifecycleMsg));
|
||||
broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId originatorEdgeId) {
|
||||
if (!edgesEnabled) {
|
||||
@ -827,7 +849,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
|
||||
private void handleCalculatedFieldEntityUpdateEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId, boolean added, boolean updated, boolean deleted) {
|
||||
TransportProtos.CalculatedFieldEntityUpdateMsgProto.Builder builder = TransportProtos.CalculatedFieldEntityUpdateMsgProto.newBuilder();
|
||||
CalculatedFieldEntityUpdateMsgProto.Builder builder = CalculatedFieldEntityUpdateMsgProto.newBuilder();
|
||||
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
builder.setEntityType(entityId.getEntityType().name());
|
||||
@ -846,9 +868,22 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
builder.setAdded(added);
|
||||
builder.setUpdated(updated);
|
||||
builder.setDeleted(deleted);
|
||||
TransportProtos.CalculatedFieldEntityUpdateMsgProto msg = builder.build();
|
||||
CalculatedFieldEntityUpdateMsgProto msg = builder.build();
|
||||
|
||||
pushNotificationToCalculatedFields(tenantId, entityId, ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(msg).build(), null);
|
||||
broadcastEntityUpdateEvent(msg);
|
||||
pushMsgToCalculatedFields(tenantId, entityId, ToCalculatedFieldMsg.newBuilder().setEntityUpdateMsg(msg).build(), null);
|
||||
}
|
||||
|
||||
private void broadcastEntityUpdateEvent(CalculatedFieldEntityUpdateMsgProto proto) {
|
||||
TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> toCalculatedFieldProducer = producerProvider.getCalculatedFieldsNotificationsMsgProducer();
|
||||
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
|
||||
Set<String> tbCalculatedFieldServices = new HashSet<>(tbRuleEngineServices);
|
||||
for (String serviceId : tbCalculatedFieldServices) {
|
||||
TopicPartitionInfo tpi = topicService.getCalculatedFieldNotificationsTopic(serviceId);
|
||||
ToCalculatedFieldNotificationMsg toCfNotificationMsg = ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(proto).build();
|
||||
toCalculatedFieldProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), toCfNotificationMsg), null);
|
||||
toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -178,16 +178,12 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
}
|
||||
} else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictProfile(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictEntity(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictProfile(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictEntity(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg);
|
||||
} else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
|
||||
@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.TbResourceInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -127,4 +128,8 @@ public interface TbClusterService extends TbQueueClusterService {
|
||||
|
||||
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId);
|
||||
|
||||
void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField, ComponentLifecycleMsg lifecycleMsg);
|
||||
|
||||
void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, ComponentLifecycleMsg lifecycleMsg);
|
||||
|
||||
}
|
||||
|
||||
@ -90,6 +90,7 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
||||
@ -1632,6 +1632,8 @@ message ToEdgeEventNotificationMsg {
|
||||
message ToCalculatedFieldMsg {
|
||||
CalculatedFieldTelemetryMsgProto telemetryMsg = 1;
|
||||
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2;
|
||||
ComponentLifecycleMsgProto componentLifecycleMsg = 3;
|
||||
CalculatedFieldEntityUpdateMsgProto entityUpdateMsg = 4;
|
||||
}
|
||||
|
||||
message ToCalculatedFieldNotificationMsg {
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.queue.provider;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
@ -50,7 +51,7 @@ public class TbCoreQueueProducerProvider implements TbQueueProducerProvider {
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> toUsageStats;
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToVersionControlServiceMsg>> toVersionControl;
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToHousekeeperServiceMsg>> toHousekeeper;
|
||||
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> toCalculatedFields;
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldMsg>> toCalculatedFields;
|
||||
private TbQueueProducer<TbProtoQueueMsg<ToCalculatedFieldNotificationMsg>> toCalculatedFieldNotifications;
|
||||
|
||||
public TbCoreQueueProducerProvider(TbCoreQueueFactory tbQueueProvider) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user