From 3d42a4ca04af2c21a89fc7fa11a8f9fd7adb970d Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 27 Jan 2025 17:57:39 +0200 Subject: [PATCH] Actor system implementation draft --- .../CalculatedFieldEntityMessageProcessor.java | 8 ++++---- .../cf/DefaultCalculatedFieldExecutionService.java | 6 ++++-- .../service/cf/ctx/state/CalculatedFieldState.java | 1 + .../org/thingsboard/server/common/util/ProtoUtils.java | 10 ++++++---- .../sql/cf/DefaultNativeCalculatedFieldRepository.java | 2 +- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 6bb6256563..533eeb5e31 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -78,6 +78,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM this.ctx = ctx; } + public void process(CalculatedFieldStateRestoreMsg msg) { + states.put(msg.getId().cfId(), msg.getState()); + } + public void process(EntityCalculatedFieldTelemetryMsg msg) { var proto = msg.getProto(); var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); @@ -192,8 +196,4 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } return cfIds; } - - public void process(CalculatedFieldStateRestoreMsg msg) { - states.put(msg.getId().cfId(), msg.getState()); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index bb8ba8bb63..3e37366da3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -837,8 +837,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits()); telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); - for (CalculatedFieldId cfId : calculatedFieldIds) { - telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); + if(calculatedFieldIds != null) { + for (CalculatedFieldId cfId : calculatedFieldIds) { + telemetryMsg.addPreviousCalculatedFields(toProto(cfId)); + } } return telemetryMsg; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 4b7918cc03..f261e586a4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -44,5 +44,6 @@ public interface CalculatedFieldState { ListenableFuture performCalculation(CalculatedFieldCtx ctx); + @JsonIgnore boolean isReady(); } diff --git a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 6a40f13688..9bbce7c522 100644 --- a/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -644,11 +644,13 @@ public class ProtoUtils { } public static TransportProtos.TsKvProto toTsKvProto(TsKvEntry tsKvEntry) { - return TransportProtos.TsKvProto.newBuilder() + var builder = TransportProtos.TsKvProto.newBuilder() .setTs(tsKvEntry.getTs()) - .setKv(toKeyValueProto(tsKvEntry)) - .setVersion(tsKvEntry.getVersion()) - .build(); + .setKv(toKeyValueProto(tsKvEntry)); + if (tsKvEntry.getVersion() != null) { + builder.setVersion(tsKvEntry.getVersion()); + } + return builder.build(); } public static TransportProtos.KeyValueProto toKeyValueProto(KvEntry kvEntry) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java index bb88982e3d..677234dc20 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/DefaultNativeCalculatedFieldRepository.java @@ -77,7 +77,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF String name = (String) row.get("name"); int configurationVersion = (int) row.get("configuration_version"); JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration")); - long version = (long) row.get("version"); + long version = row.get("version") != null ? (long) row.get("version") : 0; Object externalIdObj = row.get("external_id"); CalculatedField calculatedField = new CalculatedField();