diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 93a0ecc75f..893dba92a9 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -20,6 +20,8 @@ import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; @@ -36,6 +38,10 @@ public interface CalculatedFieldExecutionService { void pushRequestToQueue(AttributesSaveRequest request, List result); + void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback); + + void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback); + // void pushEntityUpdateMsg(TransportProtos.CalculatedFieldEntityUpdateMsgProto proto, TbCallback callback); /* ===================================================== */ diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index d46a7515ef..2638f22254 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -75,6 +75,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityUpdateMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleEvent; import org.thingsboard.server.gen.transport.TransportProtos.ComponentLifecycleMsgProto; @@ -234,6 +235,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return send; } + @Override + public void onTelemetryMsg(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { + + callback.onSuccess(); + } + + @Override + public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { + + callback.onSuccess(); + } + @Override protected Map>> onAddedPartitions(Set addedPartitions) { var result = new HashMap>>(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java index 19d6e295b5..b36b34dd94 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCalculatedFieldConsumerService.java @@ -21,6 +21,8 @@ import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; @@ -32,12 +34,19 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.QueueConfig; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -50,12 +59,20 @@ import org.thingsboard.server.service.cf.CalculatedFieldCache; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import org.thingsboard.server.service.queue.DefaultTbCoreConsumerService.PendingMsgHolder; import org.thingsboard.server.service.queue.consumer.MainQueueConsumerManager; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; +import org.thingsboard.server.service.queue.processing.IdMsgPair; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import java.util.List; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Service @TbRuleEngineComponent @@ -132,7 +149,46 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer } private void processMsgs(List> msgs, TbQueueConsumer> consumer, CalculatedFieldQueueConfig config) throws Exception { - + List> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); + ConcurrentMap> pendingMap = orderedMsgList.stream().collect( + Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg)); + CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + TbPackProcessingContext> ctx = new TbPackProcessingContext<>( + processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); + PendingMsgHolder pendingMsgHolder = new PendingMsgHolder<>(); + Future packSubmitFuture = consumersExecutor.submit(() -> { + orderedMsgList.forEach((element) -> { + UUID id = element.getUuid(); + TbProtoQueueMsg msg = element.getMsg(); + log.trace("[{}] Creating main callback for message: {}", id, msg.getValue()); + TbCallback callback = new TbPackCallback<>(id, ctx); + try { + ToCalculatedFieldMsg toCfMsg = msg.getValue(); + pendingMsgHolder.setMsg(toCfMsg); + if (toCfMsg.hasTelemetryMsg()) { + log.trace("[{}] Forwarding regular telemetry message for processing {}", id, toCfMsg.getTelemetryMsg()); + forwardToCalculatedFieldService(toCfMsg.getTelemetryMsg(), callback); + } else if (toCfMsg.hasLinkedTelemetryMsg()) { + log.trace("[{}] Forwarding linked telemetry message for processing {}", id, toCfMsg.getLinkedTelemetryMsg()); + forwardToCalculatedFieldService(toCfMsg.getLinkedTelemetryMsg(), callback); + } + } catch (Throwable e) { + log.warn("[{}] Failed to process message: {}", id, msg, e); + callback.onFailure(e); + } + }); + }); + if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + if (!packSubmitFuture.isDone()) { + packSubmitFuture.cancel(true); + log.info("Timeout to process message: {}", pendingMsgHolder.getMsg()); + } + if (log.isDebugEnabled()) { + ctx.getAckMap().forEach((id, msg) -> log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); + } + ctx.getFailedMap().forEach((id, msg) -> log.warn("[{}] Failed to process message: {}", id, msg.getValue())); + } + consumer.commit(); } @Override @@ -193,6 +249,33 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer // } // } // + + private void forwardToCalculatedFieldService(CalculatedFieldLinkedTelemetryMsgProto linkedMsg, TbCallback callback) { + var msg = linkedMsg.getMsg(); + var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); + var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onLinkedTelemetryMsg(linkedMsg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t); + callback.onFailure(t); + }); + + } + + private void forwardToCalculatedFieldService(CalculatedFieldTelemetryMsgProto msg, TbCallback callback) { + var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); + var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onTelemetryMsg(msg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process calculated field message for calculated field [{}]", tenantId.getId(), calculatedFieldId.getId(), t); + callback.onFailure(t); + }); + } + private void forwardToCalculatedFieldService(TransportProtos.ComponentLifecycleMsgProto msg, TbCallback callback) { var tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); var calculatedFieldId = new CalculatedFieldId(new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB())); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index e3a1ca22d3..ca42298742 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -269,7 +269,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> ctx = new TbPackProcessingContext<>( processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); - PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); + PendingMsgHolder pendingMsgHolder = new PendingMsgHolder<>(); Future packSubmitFuture = consumersExecutor.submit(() -> { orderedMsgList.forEach((element) -> { UUID id = element.getUuid(); @@ -278,7 +278,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService(id, ctx); try { ToCoreMsg toCoreMsg = msg.getValue(); - pendingMsgHolder.setToCoreMsg(toCoreMsg); + pendingMsgHolder.setMsg(toCoreMsg); if (toCoreMsg.hasToSubscriptionMgrMsg()) { log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg()); forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback); @@ -335,8 +335,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService log.debug("[{}] Timeout to process message: {}", id, msg.getValue())); @@ -346,12 +345,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> ctx = new TbPackProcessingContext<>( processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); - PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); + PendingMsgHolder pendingMsgHolder = new PendingMsgHolder<>(); Future submitFuture = consumersExecutor.submit(() -> { orderedMsgList.forEach((element) -> { UUID id = element.getUuid(); @@ -145,7 +145,7 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService(id, ctx); try { ToEdgeMsg toEdgeMsg = msg.getValue(); - pendingMsgHolder.setToEdgeMsg(toEdgeMsg); + pendingMsgHolder.setMsg(toEdgeMsg); if (toEdgeMsg.hasEdgeNotificationMsg()) { pushNotificationToEdge(toEdgeMsg.getEdgeNotificationMsg(), 0, packProcessingRetries, callback); } @@ -161,20 +161,13 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService log.warn("[{}] Failed to process message: {}", id, msg.getValue())); } consumer.commit(); } - private static class PendingMsgHolder { - @Getter - @Setter - private volatile ToEdgeMsg toEdgeMsg; - } - @Override protected ServiceType getServiceType() { return ServiceType.TB_CORE; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/PendingMsgHolder.java b/application/src/main/java/org/thingsboard/server/service/queue/PendingMsgHolder.java new file mode 100644 index 0000000000..8793e45da6 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/PendingMsgHolder.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.Getter; +import lombok.Setter; + +public class PendingMsgHolder { + @Getter @Setter + private volatile T msg; +}