Merge with origin
This commit is contained in:
commit
e09ef84f43
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.cf;
|
||||
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
@ -42,6 +41,10 @@ public interface CalculatedFieldCache {
|
||||
|
||||
Set<EntityId> getEntitiesByProfile(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void evictProfile(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void evictEntity(TenantId tenantId, EntityId entityId);
|
||||
|
||||
void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
void updateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId);
|
||||
|
||||
@ -23,7 +23,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntit
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
|
||||
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -48,9 +47,9 @@ public interface CalculatedFieldExecutionService {
|
||||
|
||||
void onCalculatedFieldLifecycleMsg(ComponentLifecycleMsgProto proto, TbCallback callback);
|
||||
|
||||
void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest);
|
||||
void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback);
|
||||
|
||||
// void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto);
|
||||
void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback);
|
||||
|
||||
void onEntityUpdateMsg(CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
|
||||
|
||||
|
||||
@ -22,11 +22,16 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.script.api.tbel.TbelInvokeService;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -172,6 +177,33 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache {
|
||||
return entities;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictProfile(TenantId tenantId, EntityId entityId) {
|
||||
log.debug("[{}] evict entity profile from cache.", entityId);
|
||||
profileEntities.remove(entityId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictEntity(TenantId tenantId, EntityId entityId) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
try {
|
||||
profileEntities.forEach((profile, entityIds) -> entityIds.remove(entityId));
|
||||
if (EntityType.ASSET.equals(entityId.getEntityType())) {
|
||||
Asset asset = assetService.findAssetById(tenantId, (AssetId) entityId);
|
||||
if (asset != null) {
|
||||
profileEntities.computeIfAbsent(asset.getAssetProfileId(), profileId -> new HashSet<>()).add(entityId);
|
||||
}
|
||||
} else {
|
||||
Device device = deviceService.findDeviceById(tenantId, (DeviceId) entityId);
|
||||
if (device != null) {
|
||||
profileEntities.computeIfAbsent(device.getDeviceProfileId(), profileId -> new HashSet<>()).add(entityId);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
calculatedFieldFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
|
||||
calculatedFieldFetchLock.lock();
|
||||
|
||||
@ -36,6 +36,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
|
||||
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.AttributeScope;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
|
||||
@ -72,15 +73,17 @@ import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.cf.CalculatedFieldService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx;
|
||||
@ -93,7 +96,9 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
|
||||
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
|
||||
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
|
||||
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry;
|
||||
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest;
|
||||
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest;
|
||||
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTimeSeriesUpdateRequest;
|
||||
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
@ -347,7 +352,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
initializeStateForEntity(calculatedFieldCtx, entityId, callback);
|
||||
}
|
||||
case ASSET_PROFILE, DEVICE_PROFILE -> {
|
||||
log.info("Initializing state for all entities in profile: tenantICalculatedFieldMsgProtod=[{}], profileId=[{}]", tenantId, entityId);
|
||||
log.info("Initializing state for all entities in profile: tenantId=[{}], profileId=[{}]", tenantId, entityId);
|
||||
Map<String, Argument> commonArguments = calculatedFieldCtx.getArguments().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().getRefEntityId() != null)
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
@ -414,8 +419,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest request) {
|
||||
public void onTelemetryUpdate(CalculatedFieldTelemetryMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
CalculatedFieldTelemetryUpdateRequest request = fromProto(proto);
|
||||
EntityId entityId = request.getEntityId();
|
||||
|
||||
if (supportedReferencedEntities.contains(entityId.getEntityType())) {
|
||||
@ -431,15 +437,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
processCalculatedFieldLinks(request, tpiStatesToUpdate);
|
||||
if (!tpiStatesToUpdate.isEmpty()) {
|
||||
tpiStatesToUpdate.forEach((topicPartitionInfo, ctxIds) -> {
|
||||
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request, ctxIds);
|
||||
clusterService.pushMsgToRuleEngine(topicPartitionInfo, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
|
||||
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsgProto = buildLinkedTelemetryMsgProto(proto, ctxIds);
|
||||
clusterService.pushMsgToCalculatedFields(topicPartitionInfo, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setLinkedTelemetryMsg(linkedTelemetryMsgProto).build(), null);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
TransportProtos.TelemetryUpdateMsgProto telemetryUpdateMsgProto = buildTelemetryUpdateMsgProto(request);
|
||||
clusterService.pushMsgToRuleEngine(tpi, UUID.randomUUID(), TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setCfTelemetryUpdateMsg(telemetryUpdateMsgProto).build(), null);
|
||||
clusterService.pushMsgToCalculatedFields(tpi, UUID.randomUUID(), ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(proto).build(), null);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
@ -492,30 +495,30 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
}
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public void onTelemetryUpdateMsg(TransportProtos.TelemetryUpdateMsgProto proto) {
|
||||
// try {
|
||||
// CalculatedFieldTelemetryUpdateRequest request = fromProto(proto);
|
||||
//
|
||||
// if (proto.getLinksList().isEmpty()) {
|
||||
// onTelemetryUpdate(request);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// proto.getLinksList().forEach(ctxIdProto -> {
|
||||
// CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
|
||||
// CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId, tbelInvokeService);
|
||||
//
|
||||
// Map<String, KvEntry> updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId());
|
||||
// if (!updatedTelemetry.isEmpty()) {
|
||||
// EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
|
||||
// executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry);
|
||||
// }
|
||||
// });
|
||||
// } catch (Exception e) {
|
||||
// log.trace("Failed to process telemetry update msg: [{}]", proto, e);
|
||||
// }
|
||||
// }
|
||||
@Override
|
||||
public void onTelemetryUpdate(CalculatedFieldLinkedTelemetryMsgProto proto, TbCallback callback) {
|
||||
try {
|
||||
CalculatedFieldTelemetryUpdateRequest request = fromProto(proto.getMsg());
|
||||
|
||||
if (proto.getLinksList().isEmpty()) {
|
||||
onTelemetryUpdate(proto, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
proto.getLinksList().forEach(ctxIdProto -> {
|
||||
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(ctxIdProto.getCalculatedFieldIdMSB(), ctxIdProto.getCalculatedFieldIdLSB()));
|
||||
CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(calculatedFieldId);
|
||||
|
||||
Map<String, KvEntry> updatedTelemetry = request.getMappedTelemetry(ctx, request.getEntityId());
|
||||
if (!updatedTelemetry.isEmpty()) {
|
||||
EntityId targetEntityId = EntityIdFactory.getByTypeAndUuid(ctxIdProto.getEntityType(), new UUID(ctxIdProto.getEntityIdMSB(), ctxIdProto.getEntityIdLSB()));
|
||||
executeTelemetryUpdate(ctx, targetEntityId, request.getPreviousCalculatedFieldIds(), updatedTelemetry);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.trace("Failed to process telemetry update msg: [{}]", proto, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeTelemetryUpdate(CalculatedFieldCtx cfCtx, EntityId entityId, List<CalculatedFieldId> previousCalculatedFieldIds, Map<String, KvEntry> updatedTelemetry) {
|
||||
log.info("Received telemetry update msg: tenantId=[{}], entityId=[{}], calculatedFieldId=[{}]", cfCtx.getTenantId(), entityId, cfCtx.getCfId());
|
||||
@ -766,23 +769,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
return Futures.transform(tsRollingFuture, tsRolling -> tsRolling == null ? TsRollingArgumentEntry.EMPTY : ArgumentEntry.createTsRollingArgument(tsRolling), calculatedFieldCallbackExecutor);
|
||||
}
|
||||
|
||||
// private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
|
||||
// return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder()
|
||||
// .setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
|
||||
// .setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits())
|
||||
// .setEntityType(ctxId.entityId().getEntityType().name())
|
||||
// .setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits())
|
||||
// .setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits())
|
||||
// .build();
|
||||
// }
|
||||
//
|
||||
// private TransportProtos.CalculatedFieldIdProto toProto(CalculatedFieldId cfId) {
|
||||
// return TransportProtos.CalculatedFieldIdProto.newBuilder()
|
||||
// .setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits())
|
||||
// .setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits())
|
||||
// .build();
|
||||
// }
|
||||
|
||||
private KvEntry createDefaultKvEntry(Argument argument) {
|
||||
String key = argument.getRefEntityKey().getKey();
|
||||
String defaultValue = argument.getDefaultValue();
|
||||
@ -834,22 +820,28 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
|
||||
|
||||
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds());
|
||||
for (TsKvEntry entry : request.getEntries()) {
|
||||
telemetryMsg.addTsData(ProtoUtils.toTsKvProto(entry));
|
||||
List<TsKvEntry> entries = request.getEntries();
|
||||
List<Long> versions = result.getVersions();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
long tsVersion = versions.get(i);
|
||||
TsKvProto tsProto = ProtoUtils.toTsKvProto(entries.get(i)).toBuilder().setVersion(tsVersion).build();
|
||||
telemetryMsg.addTsData(tsProto);
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
|
||||
return msg.build();
|
||||
}
|
||||
|
||||
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> result) {
|
||||
//TODO: IM Use result in both methods to update the versions of telemetry/attributes.
|
||||
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, List<Long> versions) {
|
||||
ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder();
|
||||
|
||||
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds());
|
||||
telemetryMsg.setScope(AttributeScopeProto.valueOf(request.getScope().name()));
|
||||
for (AttributeKvEntry entry : request.getEntries()) {
|
||||
telemetryMsg.addAttrData(ProtoUtils.toProto(entry));
|
||||
List<AttributeKvEntry> entries = request.getEntries();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
long attrVersion = versions.get(i);
|
||||
AttributeValueProto attrProto = ProtoUtils.toProto(entries.get(i)).toBuilder().setVersion(attrVersion).build();
|
||||
telemetryMsg.addAttrData(attrProto);
|
||||
}
|
||||
msg.setTelemetryMsg(telemetryMsg.build());
|
||||
|
||||
@ -867,15 +859,66 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
|
||||
telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
|
||||
for (CalculatedFieldId cfId : calculatedFieldIds) {
|
||||
CalculatedFieldIdProto.Builder calculatedFieldIdProto = CalculatedFieldIdProto.newBuilder();
|
||||
calculatedFieldIdProto.setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits());
|
||||
calculatedFieldIdProto.setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits());
|
||||
telemetryMsg.addPreviousCalculatedFields(calculatedFieldIdProto.build());
|
||||
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
|
||||
}
|
||||
|
||||
return telemetryMsg;
|
||||
}
|
||||
|
||||
private CalculatedFieldLinkedTelemetryMsgProto buildLinkedTelemetryMsgProto(CalculatedFieldTelemetryMsgProto telemetryProto, List<CalculatedFieldEntityCtxId> links) {
|
||||
TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.Builder builder = TransportProtos.CalculatedFieldLinkedTelemetryMsgProto.newBuilder();
|
||||
builder.setMsg(telemetryProto);
|
||||
for (CalculatedFieldEntityCtxId link : links) {
|
||||
builder.addLinks(toProto(link));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private TransportProtos.CalculatedFieldEntityCtxIdProto toProto(CalculatedFieldEntityCtxId ctxId) {
|
||||
return TransportProtos.CalculatedFieldEntityCtxIdProto.newBuilder()
|
||||
.setCalculatedFieldIdMSB(ctxId.cfId().getId().getMostSignificantBits())
|
||||
.setCalculatedFieldIdLSB(ctxId.cfId().getId().getLeastSignificantBits())
|
||||
.setEntityType(ctxId.entityId().getEntityType().name())
|
||||
.setEntityIdMSB(ctxId.entityId().getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(ctxId.entityId().getId().getLeastSignificantBits())
|
||||
.build();
|
||||
}
|
||||
|
||||
private TransportProtos.CalculatedFieldIdProto toProto(CalculatedFieldId cfId) {
|
||||
return TransportProtos.CalculatedFieldIdProto.newBuilder()
|
||||
.setCalculatedFieldIdMSB(cfId.getId().getMostSignificantBits())
|
||||
.setCalculatedFieldIdLSB(cfId.getId().getLeastSignificantBits())
|
||||
.build();
|
||||
}
|
||||
|
||||
private CalculatedFieldTelemetryUpdateRequest fromProto(CalculatedFieldTelemetryMsgProto proto) {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB()));
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
|
||||
if (!proto.getTsDataList().isEmpty()) {
|
||||
List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream()
|
||||
.map(ProtoUtils::fromProto)
|
||||
.toList();
|
||||
return new CalculatedFieldTimeSeriesUpdateRequest(
|
||||
tenantId, entityId, updatedTelemetry,
|
||||
proto.getPreviousCalculatedFieldsList().stream()
|
||||
.map(cfIdProto -> new CalculatedFieldId(
|
||||
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
||||
.toList());
|
||||
} else {
|
||||
AttributeScope scope = AttributeScope.valueOf(proto.getScope().name());
|
||||
List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream()
|
||||
.map(ProtoUtils::fromProto)
|
||||
.toList();
|
||||
return new CalculatedFieldAttributeUpdateRequest(
|
||||
tenantId, entityId, scope, updatedTelemetry,
|
||||
proto.getPreviousCalculatedFieldsList().stream()
|
||||
.map(cfIdProto -> new CalculatedFieldId(
|
||||
new UUID(cfIdProto.getCalculatedFieldIdMSB(), cfIdProto.getCalculatedFieldIdLSB())))
|
||||
.toList());
|
||||
}
|
||||
}
|
||||
|
||||
private static TbQueueCallback wrap(FutureCallback<Void> callback) {
|
||||
if (callback != null) {
|
||||
return new FutureCallbackWrapper(callback);
|
||||
|
||||
@ -33,7 +33,6 @@ import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
@ -88,7 +87,7 @@ public class EntityStateSourcingListener {
|
||||
case ASSET -> {
|
||||
onAssetUpdate(event.getEntity(), event.getOldEntity());
|
||||
}
|
||||
case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> {
|
||||
case ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE, CALCULATED_FIELD -> {
|
||||
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent);
|
||||
}
|
||||
case RULE_CHAIN -> {
|
||||
@ -123,9 +122,6 @@ public class EntityStateSourcingListener {
|
||||
ApiUsageState apiUsageState = (ApiUsageState) event.getEntity();
|
||||
tbClusterService.onApiStateChange(apiUsageState, null);
|
||||
}
|
||||
case CALCULATED_FIELD -> {
|
||||
onCalculatedFieldUpdate(event.getEntity(), event.getOldEntity());
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
@ -150,7 +146,7 @@ public class EntityStateSourcingListener {
|
||||
Asset asset = (Asset) event.getEntity();
|
||||
tbClusterService.onAssetDeleted(tenantId, asset, null);
|
||||
}
|
||||
case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> {
|
||||
case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE, CALCULATED_FIELD -> {
|
||||
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED);
|
||||
}
|
||||
case NOTIFICATION_REQUEST -> {
|
||||
@ -191,10 +187,6 @@ public class EntityStateSourcingListener {
|
||||
TbResourceInfo tbResource = (TbResourceInfo) event.getEntity();
|
||||
tbClusterService.onResourceDeleted(tbResource, null);
|
||||
}
|
||||
case CALCULATED_FIELD -> {
|
||||
CalculatedField calculatedField = (CalculatedField) event.getEntity();
|
||||
tbClusterService.onCalculatedFieldDeleted(tenantId, calculatedField, null);
|
||||
}
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
@ -275,15 +267,6 @@ public class EntityStateSourcingListener {
|
||||
}
|
||||
}
|
||||
|
||||
private void onCalculatedFieldUpdate(Object entity, Object oldEntity) {
|
||||
CalculatedField calculatedField = (CalculatedField) entity;
|
||||
CalculatedField oldCalculatedField = null;
|
||||
if (oldEntity instanceof CalculatedField) {
|
||||
oldCalculatedField = (CalculatedField) oldEntity;
|
||||
}
|
||||
tbClusterService.onCalculatedFieldUpdated(calculatedField, oldCalculatedField);
|
||||
}
|
||||
|
||||
private void pushAssignedFromNotification(Tenant currentTenant, TenantId newTenantId, Device assignedDevice) {
|
||||
String data = JacksonUtil.toString(JacksonUtil.valueToTree(assignedDevice));
|
||||
if (data != null) {
|
||||
|
||||
@ -69,6 +69,7 @@ import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.mobile.app.MobileApp;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
@ -98,9 +99,9 @@ import org.thingsboard.server.dao.device.DeviceCredentialsService;
|
||||
import org.thingsboard.server.dao.device.DeviceProfileService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
import org.thingsboard.server.dao.mobile.MobileAppDao;
|
||||
import org.thingsboard.server.dao.notification.NotificationSettingsService;
|
||||
import org.thingsboard.server.dao.notification.NotificationTargetService;
|
||||
import org.thingsboard.server.dao.mobile.MobileAppDao;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||
import org.thingsboard.server.dao.settings.AdminSettingsService;
|
||||
@ -571,7 +572,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
|
||||
|
||||
private void save(DeviceId deviceId, String key, boolean value) {
|
||||
if (persistActivityToTelemetry) {
|
||||
ListenableFuture<Integer> saveFuture = tsService.save(
|
||||
ListenableFuture<TimeseriesSaveResult> saveFuture = tsService.save(
|
||||
TenantId.SYS_TENANT_ID,
|
||||
deviceId,
|
||||
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L);
|
||||
|
||||
@ -59,7 +59,6 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.DefaultTbCoreConsumerService.PendingMsgHolder;
|
||||
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
|
||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
|
||||
import org.thingsboard.server.service.queue.processing.IdMsgPair;
|
||||
|
||||
@ -38,12 +38,10 @@ import org.thingsboard.server.common.data.TbResourceInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.AssetProfileId;
|
||||
import org.thingsboard.server.common.data.id.CalculatedFieldId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -347,6 +345,13 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
toCoreMsgs.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
|
||||
log.trace("PUSHING msg: {} to:{}", msg, tpi);
|
||||
producerProvider.getCalculatedFieldsMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback);
|
||||
toRuleEngineNfs.incrementAndGet(); // TODO: add separate counter when we will have new ServiceType.CALCULATED_FIELDS
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(CALCULATED_FIELD_QUEUE_KEY, entityId);
|
||||
@ -409,7 +414,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) {
|
||||
DeviceId deviceId = device.getId();
|
||||
gatewayNotificationsService.onDeviceDeleted(device);
|
||||
handleProfileEntityEvent(tenantId, deviceId, device.getDeviceProfileId(), false, true);
|
||||
handleCalculatedFieldEntityDeleted(tenantId, deviceId, device.getDeviceProfileId());
|
||||
broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback);
|
||||
sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true);
|
||||
broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED);
|
||||
@ -418,7 +423,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
@Override
|
||||
public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) {
|
||||
AssetId assetId = asset.getId();
|
||||
handleProfileEntityEvent(tenantId, assetId, asset.getAssetProfileId(), true, true);
|
||||
handleCalculatedFieldEntityDeleted(tenantId, assetId, asset.getAssetProfileId());
|
||||
broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED);
|
||||
}
|
||||
|
||||
@ -653,13 +658,13 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
boolean deviceProfileChanged = !device.getDeviceProfileId().equals(old.getDeviceProfileId());
|
||||
if (deviceProfileChanged) {
|
||||
handleEntityProfileUpdatedEvent(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
|
||||
handleCalculatedFieldEntityUpdated(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId());
|
||||
}
|
||||
if (deviceNameChanged || deviceProfileChanged) {
|
||||
pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null);
|
||||
}
|
||||
} else {
|
||||
handleProfileEntityEvent(device.getTenantId(), device.getId(), device.getDeviceProfileId(), true, false);
|
||||
handleCalculatedFieldEntityAdded(device.getTenantId(), device.getId(), device.getDeviceProfileId());
|
||||
}
|
||||
broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false);
|
||||
@ -673,10 +678,10 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
if (old != null) {
|
||||
boolean assetTypeChanged = !asset.getType().equals(old.getType());
|
||||
if (assetTypeChanged) {
|
||||
handleEntityProfileUpdatedEvent(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
|
||||
handleCalculatedFieldEntityUpdated(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId());
|
||||
}
|
||||
} else {
|
||||
handleProfileEntityEvent(asset.getTenantId(), asset.getId(), asset.getAssetProfileId(), true, false);
|
||||
handleCalculatedFieldEntityAdded(asset.getTenantId(), asset.getId(), asset.getAssetProfileId());
|
||||
}
|
||||
broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
}
|
||||
@ -809,52 +814,41 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField) {
|
||||
var created = oldCalculatedField == null;
|
||||
broadcastEntityStateChangeEvent(calculatedField.getTenantId(), calculatedField.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
private void handleCalculatedFieldEntityAdded(TenantId tenantId, EntityId entityId, EntityId newProfileId) {
|
||||
handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, null, newProfileId, true, false, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback) {
|
||||
CalculatedFieldId calculatedFieldId = calculatedField.getId();
|
||||
broadcastEntityStateChangeEvent(tenantId, calculatedFieldId, ComponentLifecycleEvent.DELETED);
|
||||
private void handleCalculatedFieldEntityUpdated(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
|
||||
handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, oldProfileId, newProfileId, false, true, true);
|
||||
}
|
||||
|
||||
private void handleEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) {
|
||||
TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder();
|
||||
private void handleCalculatedFieldEntityDeleted(TenantId tenantId, EntityId entityId, EntityId oldProfileId) {
|
||||
handleCalculatedFieldEntityUpdateEvent(tenantId, entityId, oldProfileId, null, false, false, true);
|
||||
}
|
||||
|
||||
private void handleCalculatedFieldEntityUpdateEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId, boolean added, boolean updated, boolean deleted) {
|
||||
TransportProtos.CalculatedFieldEntityUpdateMsgProto.Builder builder = TransportProtos.CalculatedFieldEntityUpdateMsgProto.newBuilder();
|
||||
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
builder.setEntityType(entityId.getEntityType().name());
|
||||
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
builder.setEntityProfileType(newProfileId.getEntityType().name());
|
||||
if (oldProfileId != null) {
|
||||
builder.setEntityProfileType(oldProfileId.getEntityType().name());
|
||||
builder.setOldProfileIdMSB(oldProfileId.getId().getMostSignificantBits());
|
||||
builder.setOldProfileIdLSB(oldProfileId.getId().getLeastSignificantBits());
|
||||
}
|
||||
if (newProfileId != null) {
|
||||
builder.setEntityProfileType(newProfileId.getEntityType().name());
|
||||
builder.setNewProfileIdMSB(newProfileId.getId().getMostSignificantBits());
|
||||
builder.setNewProfileIdLSB(newProfileId.getId().getLeastSignificantBits());
|
||||
TransportProtos.EntityProfileUpdateMsgProto msg = builder.build();
|
||||
|
||||
broadcastToCore(ToCoreNotificationMsg.newBuilder().setEntityProfileUpdateMsg(msg).build());
|
||||
pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null);
|
||||
}
|
||||
|
||||
private void handleProfileEntityEvent(TenantId tenantId, EntityId entityId, EntityId profileId, boolean added, boolean deleted) {
|
||||
TransportProtos.ProfileEntityMsgProto.Builder builder = TransportProtos.ProfileEntityMsgProto.newBuilder();
|
||||
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||
builder.setEntityType(entityId.getEntityType().name());
|
||||
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
|
||||
builder.setEntityProfileType(profileId.getEntityType().name());
|
||||
builder.setProfileIdMSB(profileId.getId().getMostSignificantBits());
|
||||
builder.setProfileIdLSB(profileId.getId().getLeastSignificantBits());
|
||||
builder.setAdded(added);
|
||||
builder.setUpdated(updated);
|
||||
builder.setDeleted(deleted);
|
||||
TransportProtos.ProfileEntityMsgProto msg = builder.build();
|
||||
TransportProtos.CalculatedFieldEntityUpdateMsgProto msg = builder.build();
|
||||
|
||||
broadcastToCore(ToCoreNotificationMsg.newBuilder().setProfileEntityMsg(msg).build());
|
||||
pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setProfileEntityMsg(msg).build(), null);
|
||||
pushNotificationToCalculatedFields(tenantId, entityId, ToCalculatedFieldNotificationMsg.newBuilder().setEntityUpdateMsg(msg).build(), null);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -39,8 +39,6 @@ import org.thingsboard.server.common.data.event.ErrorEvent;
|
||||
import org.thingsboard.server.common.data.event.Event;
|
||||
import org.thingsboard.server.common.data.event.LifecycleEvent;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
@ -88,7 +86,6 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldCache;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
|
||||
import org.thingsboard.server.service.notification.NotificationSchedulerService;
|
||||
import org.thingsboard.server.service.ota.OtaPackageStateService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
@ -110,7 +107,6 @@ import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpd
|
||||
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -153,14 +149,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
private final TbImageService imageService;
|
||||
private final RuleEngineCallService ruleEngineCallService;
|
||||
private final TbCoreConsumerStats stats;
|
||||
private final CalculatedFieldExecutionService calculatedFieldExecutionService;
|
||||
|
||||
private MainQueueConsumerManager<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig> mainConsumer;
|
||||
private QueueConsumerManager<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
|
||||
private QueueConsumerManager<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
|
||||
|
||||
private volatile ListeningExecutorService deviceActivityEventsExecutor;
|
||||
private volatile ListeningExecutorService calculatedFieldsExecutor;
|
||||
|
||||
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
|
||||
ActorSystemContext actorContext,
|
||||
@ -183,7 +177,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
NotificationRuleProcessor notificationRuleProcessor,
|
||||
TbImageService imageService,
|
||||
RuleEngineCallService ruleEngineCallService,
|
||||
CalculatedFieldExecutionService calculatedFieldExecutionService,
|
||||
CalculatedFieldCache calculatedFieldCache) {
|
||||
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, calculatedFieldCache, apiUsageStateService, partitionService,
|
||||
eventPublisher, jwtSettingsService);
|
||||
@ -200,14 +193,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
this.imageService = imageService;
|
||||
this.ruleEngineCallService = ruleEngineCallService;
|
||||
this.queueFactory = tbCoreQueueFactory;
|
||||
this.calculatedFieldExecutionService = calculatedFieldExecutionService;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.init("tb-core");
|
||||
this.deviceActivityEventsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-device-activity-events-executor")));
|
||||
this.calculatedFieldsExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-calculated-fields-executor")));
|
||||
|
||||
this.mainConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToCoreMsg>, CoreQueueConfig>builder()
|
||||
.queueKey(new QueueKey(ServiceType.TB_CORE))
|
||||
@ -319,12 +310,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
forwardToEventService(toCoreMsg.getErrorEventMsg(), callback);
|
||||
} else if (toCoreMsg.hasLifecycleEventMsg()) {
|
||||
forwardToEventService(toCoreMsg.getLifecycleEventMsg(), callback);
|
||||
} else if (toCoreMsg.hasCalculatedFieldMsg()) {
|
||||
forwardToCalculatedFieldService(toCoreMsg.getCalculatedFieldMsg(), callback);
|
||||
} else if (toCoreMsg.hasEntityProfileUpdateMsg()) {
|
||||
forwardToCalculatedFieldService(toCoreMsg.getEntityProfileUpdateMsg(), callback);
|
||||
} else if (toCoreMsg.hasProfileEntityMsg()) {
|
||||
forwardToCalculatedFieldService(toCoreMsg.getProfileEntityMsg(), callback);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}] Failed to process message: {}", id, msg, e);
|
||||
@ -406,10 +391,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.hasResourceCacheInvalidateMsg()) {
|
||||
forwardToResourceService(toCoreNotification.getResourceCacheInvalidateMsg(), callback);
|
||||
} else if (toCoreNotification.hasEntityProfileUpdateMsg()) {
|
||||
processEntityProfileUpdateMsg(toCoreNotification.getEntityProfileUpdateMsg());
|
||||
} else if (toCoreNotification.hasProfileEntityMsg()) {
|
||||
processProfileEntityMsg(toCoreNotification.getProfileEntityMsg());
|
||||
}
|
||||
if (statsEnabled) {
|
||||
stats.log(toCoreNotification);
|
||||
@ -528,28 +509,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
private void processEntityProfileUpdateMsg(TransportProtos.EntityProfileUpdateMsgProto profileUpdateMsg) {
|
||||
var tenantId = toTenantId(profileUpdateMsg.getTenantIdMSB(), profileUpdateMsg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityType(), new UUID(profileUpdateMsg.getEntityIdMSB(), profileUpdateMsg.getEntityIdLSB()));
|
||||
var oldProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getOldProfileIdMSB(), profileUpdateMsg.getOldProfileIdLSB()));
|
||||
var newProfile = EntityIdFactory.getByTypeAndUuid(profileUpdateMsg.getEntityProfileType(), new UUID(profileUpdateMsg.getNewProfileIdMSB(), profileUpdateMsg.getNewProfileIdLSB()));
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, oldProfile).remove(entityId);
|
||||
calculatedFieldCache.getEntitiesByProfile(tenantId, newProfile).add(entityId);
|
||||
}
|
||||
|
||||
private void processProfileEntityMsg(TransportProtos.ProfileEntityMsgProto profileEntityMsg) {
|
||||
var tenantId = toTenantId(profileEntityMsg.getTenantIdMSB(), profileEntityMsg.getTenantIdLSB());
|
||||
var entityId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityType(), new UUID(profileEntityMsg.getEntityIdMSB(), profileEntityMsg.getEntityIdLSB()));
|
||||
var profileId = EntityIdFactory.getByTypeAndUuid(profileEntityMsg.getEntityProfileType(), new UUID(profileEntityMsg.getProfileIdMSB(), profileEntityMsg.getProfileIdLSB()));
|
||||
boolean added = profileEntityMsg.getAdded();
|
||||
Set<EntityId> entitiesByProfile = calculatedFieldCache.getEntitiesByProfile(tenantId, profileId);
|
||||
if (added) {
|
||||
entitiesByProfile.add(entityId);
|
||||
} else {
|
||||
entitiesByProfile.remove(entityId);
|
||||
}
|
||||
}
|
||||
|
||||
private void forwardToSubMgrService(SubscriptionMgrMsgProto msg, TbCallback callback) {
|
||||
if (msg.hasSubEvent()) {
|
||||
TbEntitySubEventProto subEvent = msg.getSubEvent();
|
||||
|
||||
@ -178,12 +178,16 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
}
|
||||
} else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictProfile(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictEntity(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictProfile(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
||||
calculatedFieldCache.evictEntity(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg);
|
||||
} else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
|
||||
@ -28,14 +28,12 @@ import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
|
||||
import org.thingsboard.server.common.msg.queue.RuleEngineException;
|
||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
|
||||
import org.thingsboard.server.service.queue.TbMsgPackCallback;
|
||||
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext;
|
||||
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;
|
||||
@ -65,18 +63,14 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
||||
private final TbRuleEngineConsumerContext ctx;
|
||||
private final TbRuleEngineConsumerStats stats;
|
||||
|
||||
private final CalculatedFieldExecutionService calculatedFieldExecutionService;
|
||||
|
||||
@Builder(builderMethodName = "create") // not to conflict with super.builder()
|
||||
public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext ctx,
|
||||
QueueKey queueKey,
|
||||
ExecutorService consumerExecutor,
|
||||
ScheduledExecutorService scheduler,
|
||||
ExecutorService taskExecutor,
|
||||
CalculatedFieldExecutionService calculatedFieldExecutionService) {
|
||||
ExecutorService taskExecutor) {
|
||||
super(queueKey, null, null, ctx.getQueueFactory()::createToRuleEngineMsgConsumer, consumerExecutor, scheduler, taskExecutor);
|
||||
this.ctx = ctx;
|
||||
this.calculatedFieldExecutionService = calculatedFieldExecutionService;
|
||||
this.stats = new TbRuleEngineConsumerStats(queueKey, ctx.getStatsFactory());
|
||||
}
|
||||
|
||||
@ -178,12 +172,6 @@ public class TbRuleEngineQueueConsumerManager extends MainQueueConsumerManager<T
|
||||
try {
|
||||
if (!toRuleEngineMsg.getTbMsg().isEmpty()) {
|
||||
forwardToRuleEngineActor(config.getName(), tenantId, toRuleEngineMsg, callback);
|
||||
} else if (toRuleEngineMsg.hasCfTelemetryUpdateMsg()) {
|
||||
calculatedFieldExecutionService.onTelemetryUpdateMsg(toRuleEngineMsg.getCfTelemetryUpdateMsg());
|
||||
} else if (toRuleEngineMsg.hasEntityProfileUpdateMsg()) {
|
||||
calculatedFieldExecutionService.onEntityProfileChangedMsg(toRuleEngineMsg.getEntityProfileUpdateMsg(), TbCallback.EMPTY);
|
||||
} else if (toRuleEngineMsg.hasProfileEntityMsg()) {
|
||||
calculatedFieldExecutionService.onProfileEntityMsg(toRuleEngineMsg.getProfileEntityMsg(), TbCallback.EMPTY);
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
@ -51,7 +51,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.dao.util.KvUtils;
|
||||
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
|
||||
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest;
|
||||
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
|
||||
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
|
||||
|
||||
@ -171,7 +170,6 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
|
||||
calculatedFieldExecutionService.pushRequestToQueue(request, result);
|
||||
}, safeCallback(request.getCallback()), tsCallBackExecutor);
|
||||
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
|
||||
addCallback(saveFuture, success -> calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request)), tsCallBackExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.TbResourceInfo;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.cf.CalculatedField;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
@ -40,6 +39,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeMsg;
|
||||
@ -79,6 +79,8 @@ public interface TbClusterService extends TbQueueClusterService {
|
||||
|
||||
void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldMsg msg, TbQueueCallback callback);
|
||||
|
||||
void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback);
|
||||
|
||||
void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback);
|
||||
|
||||
void broadcastEntityStateChangeEvent(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent state);
|
||||
@ -125,8 +127,4 @@ public interface TbClusterService extends TbQueueClusterService {
|
||||
|
||||
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action, EdgeId sourceEdgeId);
|
||||
|
||||
void onCalculatedFieldUpdated(CalculatedField calculatedField, CalculatedField oldCalculatedField);
|
||||
|
||||
void onCalculatedFieldDeleted(TenantId tenantId, CalculatedField calculatedField, TbQueueCallback callback);
|
||||
|
||||
}
|
||||
|
||||
@ -58,6 +58,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKey;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
||||
@ -628,6 +629,20 @@ public class ProtoUtils {
|
||||
return new BaseAttributeKvEntry(entry, proto.getLastUpdateTs(), proto.hasVersion() ? proto.getVersion() : null);
|
||||
}
|
||||
|
||||
public static TsKvEntry fromProto(TransportProtos.TsKvProto proto) {
|
||||
TransportProtos.KeyValueProto kvProto = proto.getKv();
|
||||
String key = kvProto.getKey();
|
||||
KvEntry entry = switch (kvProto.getType()) {
|
||||
case BOOLEAN_V -> new BooleanDataEntry(key, kvProto.getBoolV());
|
||||
case LONG_V -> new LongDataEntry(key, kvProto.getLongV());
|
||||
case DOUBLE_V -> new DoubleDataEntry(key, kvProto.getDoubleV());
|
||||
case STRING_V -> new StringDataEntry(key, kvProto.getStringV());
|
||||
case JSON_V -> new JsonDataEntry(key, kvProto.getJsonV());
|
||||
default -> null;
|
||||
};
|
||||
return new BasicTsKvEntry(proto.getTs(), entry, proto.hasVersion() ? proto.getVersion() : null);
|
||||
}
|
||||
|
||||
public static TransportProtos.TsKvProto toTsKvProto(TsKvEntry tsKvEntry) {
|
||||
return TransportProtos.TsKvProto.newBuilder()
|
||||
.setTs(tsKvEntry.getTs())
|
||||
|
||||
@ -1591,8 +1591,6 @@ message ToCoreMsg {
|
||||
DeviceConnectProto deviceConnectMsg = 50;
|
||||
DeviceDisconnectProto deviceDisconnectMsg = 51;
|
||||
DeviceInactivityProto deviceInactivityMsg = 52;
|
||||
// CalculatedFieldMsgProto calculatedFieldMsg = 53;
|
||||
// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54;
|
||||
}
|
||||
|
||||
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
|
||||
@ -1612,8 +1610,6 @@ message ToCoreNotificationMsg {
|
||||
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12 [deprecated = true];
|
||||
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
|
||||
RestApiCallResponseMsgProto restApiCallResponseMsg = 50;
|
||||
// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 51;
|
||||
// ProfileEntityMsgProto profileEntityMsg = 52;
|
||||
}
|
||||
|
||||
/* Messages to Edge queue that are handled by ThingsBoard Core Service */
|
||||
|
||||
@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.StringDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.query.ApiUsageStateFilter;
|
||||
import org.thingsboard.server.common.data.query.AssetSearchQueryFilter;
|
||||
@ -1824,7 +1825,7 @@ public class EntityServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
}
|
||||
|
||||
List<ListenableFuture<Integer>> timeseriesFutures = new ArrayList<>();
|
||||
List<ListenableFuture<TimeseriesSaveResult>> timeseriesFutures = new ArrayList<>();
|
||||
for (int i = 0; i < devices.size(); i++) {
|
||||
Device device = devices.get(i);
|
||||
timeseriesFutures.add(saveLongTimeseries(device.getId(), "temperature", temperatures.get(i)));
|
||||
@ -2430,7 +2431,7 @@ public class EntityServiceTest extends AbstractServiceTest {
|
||||
return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr));
|
||||
}
|
||||
|
||||
private ListenableFuture<Integer> saveLongTimeseries(EntityId entityId, String key, Double value) {
|
||||
private ListenableFuture<TimeseriesSaveResult> saveLongTimeseries(EntityId entityId, String key, Double value) {
|
||||
TsKvEntity tsKv = new TsKvEntity();
|
||||
tsKv.setStrKey(key);
|
||||
tsKv.setDoubleValue(value);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user