Actor system implementation draft

This commit is contained in:
Andrii Shvaika 2025-01-27 17:57:39 +02:00
parent d3278f05bb
commit 3d42a4ca04
5 changed files with 16 additions and 11 deletions

View File

@ -78,6 +78,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
this.ctx = ctx;
}
public void process(CalculatedFieldStateRestoreMsg msg) {
states.put(msg.getId().cfId(), msg.getState());
}
public void process(EntityCalculatedFieldTelemetryMsg msg) {
var proto = msg.getProto();
var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
@ -192,8 +196,4 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
return cfIds;
}
public void process(CalculatedFieldStateRestoreMsg msg) {
states.put(msg.getId().cfId(), msg.getState());
}
}

View File

@ -837,8 +837,10 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
telemetryMsg.setEntityIdMSB(entityId.getId().getMostSignificantBits());
telemetryMsg.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
for (CalculatedFieldId cfId : calculatedFieldIds) {
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
if(calculatedFieldIds != null) {
for (CalculatedFieldId cfId : calculatedFieldIds) {
telemetryMsg.addPreviousCalculatedFields(toProto(cfId));
}
}
return telemetryMsg;

View File

@ -44,5 +44,6 @@ public interface CalculatedFieldState {
ListenableFuture<CalculatedFieldResult> performCalculation(CalculatedFieldCtx ctx);
@JsonIgnore
boolean isReady();
}

View File

@ -644,11 +644,13 @@ public class ProtoUtils {
}
public static TransportProtos.TsKvProto toTsKvProto(TsKvEntry tsKvEntry) {
return TransportProtos.TsKvProto.newBuilder()
var builder = TransportProtos.TsKvProto.newBuilder()
.setTs(tsKvEntry.getTs())
.setKv(toKeyValueProto(tsKvEntry))
.setVersion(tsKvEntry.getVersion())
.build();
.setKv(toKeyValueProto(tsKvEntry));
if (tsKvEntry.getVersion() != null) {
builder.setVersion(tsKvEntry.getVersion());
}
return builder.build();
}
public static TransportProtos.KeyValueProto toKeyValueProto(KvEntry kvEntry) {

View File

@ -77,7 +77,7 @@ public class DefaultNativeCalculatedFieldRepository implements NativeCalculatedF
String name = (String) row.get("name");
int configurationVersion = (int) row.get("configuration_version");
JsonNode configuration = JacksonUtil.toJsonNode((String) row.get("configuration"));
long version = (long) row.get("version");
long version = row.get("version") != null ? (long) row.get("version") : 0;
Object externalIdObj = row.get("external_id");
CalculatedField calculatedField = new CalculatedField();