Tmp commit for merge

This commit is contained in:
Andrii Shvaika 2025-01-22 12:23:42 +02:00
parent f0a71aa3bd
commit 6b9d374a5f
9 changed files with 87 additions and 58 deletions

View File

@ -21,6 +21,17 @@ import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdat
public interface CalculatedFieldExecutionService {
/**
* Push incoming telemetry to the CF processing queue for async processing.
* @param request - telemetry request;
* @param callback - callback to be executed when the message is ack by the queue.
*/
void pushRequestToQueue(CalculatedFieldTelemetryUpdateRequest request, TbCallback callback);
void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
/* ===================================================== */
void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback);
void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest);

View File

@ -0,0 +1,8 @@
package org.thingsboard.server.service.queue;
import org.springframework.context.ApplicationListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
public interface TbCalculatedFieldConsumerService extends ApplicationListener<PartitionChangeEvent> {
}

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -128,8 +129,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
KvUtils.validate(request.getEntries(), valueNoXssValidation);
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
if (!request.isOnlyLatest()) {
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
Futures.addCallback(future, callback, tsCallBackExecutor);
Futures.addCallback(future, getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant), tsCallBackExecutor);
}
} else {
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
@ -148,7 +148,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
} else {
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
}
// We need to guarantee, that the message is successfully pushed to the calculated fields service before we execute any callbacks.
saveFuture = Futures.transformAsync(saveFuture, new AsyncFunction<Integer, Integer>() {
@Override
public ListenableFuture<Integer> apply(Integer input) throws Exception {
calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(request));
return input;
}
});
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest() && !request.isOnlyLatest()) {
@ -326,19 +333,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private FutureCallback<Integer> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant, FutureCallback<Void> callback) {
private FutureCallback<Integer> getApiUsageCallback(TenantId tenantId, CustomerId customerId, boolean sysTenant) {
return new FutureCallback<>() {
@Override
public void onSuccess(Integer result) {
if (!sysTenant && result != null && result > 0) {
apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result);
}
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
};
}

View File

@ -1692,7 +1692,6 @@ queue:
enabled: "${TB_HOUSEKEEPER_STATS_ENABLED:true}"
# Statistics printing interval for Housekeeper
print-interval-ms: "${TB_HOUSEKEEPER_STATS_PRINT_INTERVAL_MS:60000}"
vc:
# Default topic name
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
@ -1739,6 +1738,17 @@ queue:
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:15}"
# Size of the thread pool that handles such operations as partition changes, config updates, queue deletion
management-thread-pool-size: "${TB_QUEUE_RULE_ENGINE_MGMT_THREAD_POOL_SIZE:12}"
calculated-fields:
# Topic name for Calculated Field (CF) tasks
topic: "${TB_QUEUE_CF_TOPIC:tb_calculated_fields}"
# Interval in milliseconds to poll messages by CF (Rule Engine) microservices
poll-interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}"
# Amount of partitions used by CF microservices
partitions: "${TB_QUEUE_CF_PARTITIONS:10}"
# Timeout for processing a message pack by CF microservices
pack-processing-timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:2000}"
# Enable/disable a separate consumer per partition for CF queue
consumer-per-partition: "${TB_QUEUE_CF_CONSUMER_PER_PARTITION:true}"
transport:
# For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

View File

@ -145,4 +145,6 @@ public class DataConstants {
public static final String EDGE_QUEUE_NAME = "Edge";
public static final String EDGE_EVENT_QUEUE_NAME = "EdgeEvent";
public static final String CF_QUEUE_NAME = "CalculatedFields";
}

View File

@ -26,7 +26,8 @@ public enum ServiceType {
TB_RULE_ENGINE("TB Rule Engine"),
TB_TRANSPORT("TB Transport"),
JS_EXECUTOR("JS Executor"),
TB_VC_EXECUTOR("TB VC Executor");
TB_VC_EXECUTOR("TB VC Executor"),
TB_CF_ENGINE("TB Calculated Fields Engine");
private final String label;

View File

@ -183,18 +183,6 @@ message TsKvListProto {
repeated KeyValueProto kv = 2;
}
message AttributeKvProto {
AttributeKey key = 1;
AttributeValueProto value = 2;
}
message TelemetryProto {
oneof proto {
AttributeKvProto attrKv = 1;
TsKvProto tsKv = 2;
}
}
message DeviceInfoProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
@ -785,17 +773,7 @@ message DeviceInactivityProto {
int64 lastInactivityTime = 5;
}
message CalculatedFieldMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 calculatedFieldIdMSB = 3;
int64 calculatedFieldIdLSB = 4;
bool added = 5;
bool updated = 6;
bool deleted = 7;
}
message EntityProfileUpdateMsgProto {
message CalculatedFieldEntityUpdateMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string entityType = 3;
@ -806,31 +784,26 @@ message EntityProfileUpdateMsgProto {
int64 oldProfileIdLSB = 8;
int64 newProfileIdMSB = 9;
int64 newProfileIdLSB = 10;
bool added = 11;
bool updated = 12;
bool deleted = 13;
}
message ProfileEntityMsgProto {
message CalculatedFieldTelemetryMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string entityType = 3;
int64 entityIdMSB = 4;
int64 entityIdLSB = 5;
string entityProfileType = 6;
int64 profileIdMSB = 7;
int64 profileIdLSB = 8;
bool added = 9;
bool deleted = 10;
}
message TelemetryUpdateMsgProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string entityType = 3;
int64 entityIdMSB = 4;
int64 entityIdLSB = 5;
repeated CalculatedFieldEntityCtxIdProto links = 6;
repeated CalculatedFieldIdProto previousCalculatedFields = 7;
string scope = 8;
repeated TelemetryProto updatedTelemetry = 9;
repeated TsKvProto tsData = 9;
AttributeScopeProto scope = 10;
repeated AttributeValueProto attrData = 11;
}
message CalculatedFieldLinkedTelemetryMsgProto {
CalculatedFieldTelemetryMsgProto msg = 1;
repeated CalculatedFieldEntityCtxIdProto links = 2;
}
message CalculatedFieldEntityCtxIdProto {
@ -1589,9 +1562,8 @@ message ToCoreMsg {
DeviceConnectProto deviceConnectMsg = 50;
DeviceDisconnectProto deviceDisconnectMsg = 51;
DeviceInactivityProto deviceInactivityMsg = 52;
CalculatedFieldMsgProto calculatedFieldMsg = 53;
EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54;
ProfileEntityMsgProto profileEntityMsg = 55;
// CalculatedFieldMsgProto calculatedFieldMsg = 53;
// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
@ -1611,8 +1583,8 @@ message ToCoreNotificationMsg {
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12 [deprecated = true];
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
RestApiCallResponseMsgProto restApiCallResponseMsg = 50;
EntityProfileUpdateMsgProto entityProfileUpdateMsg = 51;
ProfileEntityMsgProto profileEntityMsg = 52;
// EntityProfileUpdateMsgProto entityProfileUpdateMsg = 51;
// ProfileEntityMsgProto profileEntityMsg = 52;
}
/* Messages to Edge queue that are handled by ThingsBoard Core Service */
@ -1632,6 +1604,16 @@ message ToEdgeEventNotificationMsg {
EdgeEventMsgProto edgeEventMsg = 1;
}
message ToCalculatedFieldMsg {
CalculatedFieldTelemetryMsgProto telemetryMsg = 1;
CalculatedFieldLinkedTelemetryMsgProto linkedTelemetryMsg = 2;
}
message ToCalculatedFieldNotificationMsg {
ComponentLifecycleMsgProto componentLifecycle = 1;
CalculatedFieldEntityUpdateMsgProto entityUpdateMsg = 2;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */
message ToRuleEngineMsg {
int64 tenantIdMSB = 1;
@ -1639,9 +1621,6 @@ message ToRuleEngineMsg {
bytes tbMsg = 3;
repeated string relationTypes = 4;
string failureMessage = 5;
TelemetryUpdateMsgProto cfTelemetryUpdateMsg = 6;
EntityProfileUpdateMsgProto entityProfileUpdateMsg = 7;
ProfileEntityMsgProto profileEntityMsg = 8;
}
message ToRuleEngineNotificationMsg {

View File

@ -51,8 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.EDGE_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.*;
@Service
@Slf4j
@ -62,6 +61,10 @@ public class HashPartitionService implements PartitionService {
private String coreTopic;
@Value("${queue.core.partitions:10}")
private Integer corePartitions;
@Value("${queue.calculated-fields.topic}")
private String cfTopic;
@Value("${queue.calculated-fields.partitions:10}")
private Integer cfPartitions;
@Value("${queue.vc.topic:tb_version_control}")
private String vcTopic;
@Value("${queue.vc.partitions:10}")
@ -108,10 +111,15 @@ public class HashPartitionService implements PartitionService {
@PostConstruct
public void init() {
this.hashFunction = forName(hashFunctionName);
QueueKey coreKey = new QueueKey(ServiceType.TB_CORE);
partitionSizesMap.put(coreKey, corePartitions);
partitionTopicsMap.put(coreKey, coreTopic);
QueueKey cfKey = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME);
partitionSizesMap.put(cfKey, cfPartitions);
partitionTopicsMap.put(cfKey, cfTopic);
QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
partitionSizesMap.put(vcKey, vcPartitions);
partitionTopicsMap.put(vcKey, vcTopic);

View File

@ -52,6 +52,10 @@ public class PartitionChangeEvent extends TbApplicationEvent {
return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_CORE, DataConstants.EDGE_QUEUE_NAME);
}
public Set<TopicPartitionInfo> getCalculatedFieldsPartitions() {
return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
}
private Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) {
return partitionsMap.entrySet()
.stream()