WIP: CalculatedFieldConsumer refactoring

This commit is contained in:
Andrii Shvaika 2025-01-24 15:45:00 +02:00
parent 85119d0247
commit c332e7373f
6 changed files with 133 additions and 21 deletions

View File

@ -20,6 +20,8 @@ import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest;
@ -36,6 +38,10 @@ public interface CalculatedFieldExecutionService {
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result);
void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback);
void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback);
// void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback);
/* ===================================================== */

View File

@ -75,6 +75,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleEvent;
import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto;
@ -234,6 +235,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
return send;
}
@Override
public void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
callback.onSuccess();
}
@Override
public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) {
callback.onSuccess();
}
@Override
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();

View File

@ -21,6 +21,8 @@ import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
@ -32,12 +34,19 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -50,12 +59,20 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache;
import org.thingsboard.server.service.cf.CalculatedFieldExecutionService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.DefaultTbCoreConsumerService.PendingMsgHolder;
import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
@TbRuleEngineComponent
@ -132,7 +149,46 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, CalculatedFieldQueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToCalculatedFieldMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder<ToCalculatedFieldMsg> pendingMsgHolder = new PendingMsgHolder<>();
Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
TbProtoQueueMsg<ToCalculatedFieldMsg> msg = element.getMsg();
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
ToCalculatedFieldMsg toCfMsg = msg.getValue();
pendingMsgHolder.setMsg(toCfMsg);
if (toCfMsg.hasTelemetryMsg()) {
log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg());
forwardToCalculatedFieldService(toCfMsg.getTelemetryMsg(), callback);
} else if (toCfMsg.hasLinkedTelemetryMsg()) {
log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg());
forwardToCalculatedFieldService(toCfMsg.getLinkedTelemetryMsg(), callback);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
callback.onFailure(e);
}
});
});
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!packSubmitFuture.isDone()) {
packSubmitFuture.cancel(true);
log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
}
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();
}
@Override
@ -193,6 +249,33 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
// }
// }
//
private void forwardToCalculatedFieldService(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) {
var msg = linkedMsg.getMsg();
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onLinkedTelemetryMsg(linkedMsg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToCalculatedFieldService(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));
ListenableFuture<?> future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onTelemetryMsg(msg, callback));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToCalculatedFieldService(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) {
var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()));

View File

@ -269,7 +269,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
PendingMsgHolder<ToCoreMsg> pendingMsgHolder = new PendingMsgHolder<>();
Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
@ -278,7 +278,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
ToCoreMsg toCoreMsg = msg.getValue();
pendingMsgHolder.setToCoreMsg(toCoreMsg);
pendingMsgHolder.setMsg(toCoreMsg);
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
@ -335,8 +335,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!packSubmitFuture.isDone()) {
packSubmitFuture.cancel(true);
ToCoreMsg lastSubmitMsg = pendingMsgHolder.getToCoreMsg();
log.info("Timeout to process message: {}", lastSubmitMsg);
log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
}
if (log.isDebugEnabled()) {
ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue()));
@ -346,12 +345,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
consumer.commit();
}
private static class PendingMsgHolder {
@Getter
@Setter
private volatile ToCoreMsg toCoreMsg;
}
@Override
protected ServiceType getServiceType() {
return ServiceType.TB_CORE;

View File

@ -137,7 +137,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToEdgeMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
PendingMsgHolder<ToEdgeMsg> pendingMsgHolder = new PendingMsgHolder<>();
Future<?> submitFuture = consumersExecutor.submit(() -> {
orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
@ -145,7 +145,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
TbCallback callback = new TbPackCallback<>(id, ctx);
try {
ToEdgeMsg toEdgeMsg = msg.getValue();
pendingMsgHolder.setToEdgeMsg(toEdgeMsg);
pendingMsgHolder.setMsg(toEdgeMsg);
if (toEdgeMsg.hasEdgeNotificationMsg()) {
pushNotificationToEdge(toEdgeMsg.getEdgeNotificationMsg(), 0, packProcessingRetries, callback);
}
@ -161,20 +161,13 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService<ToEdge
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) {
if (!submitFuture.isDone()) {
submitFuture.cancel(true);
ToEdgeMsg lastSubmitMsg = pendingMsgHolder.getToEdgeMsg();
log.info("Timeout to process message: {}", lastSubmitMsg);
log.info("Timeout to process message: {}", pendingMsgHolder.getMsg());
}
ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue()));
}
consumer.commit();
}
private static class PendingMsgHolder {
@Getter
@Setter
private volatile ToEdgeMsg toEdgeMsg;
}
@Override
protected ServiceType getServiceType() {
return ServiceType.TB_CORE;

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.Getter;
import lombok.Setter;
public class PendingMsgHolder<T> {
@Getter @Setter
private volatile T msg;
}