merged with master

This commit is contained in:
dashevchenko 2025-03-11 17:22:57 +02:00
commit 5634829d70
23 changed files with 117 additions and 92 deletions

View File

@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
@ -34,6 +35,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
@ -74,7 +76,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
final EntityId entityId; final EntityId entityId;
final CalculatedFieldProcessingService cfService; final CalculatedFieldProcessingService cfService;
final CalculatedFieldStateService cfStateService; final CalculatedFieldStateService cfStateService;
final int partition;
TbActorCtx ctx; TbActorCtx ctx;
Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>(); Map<CalculatedFieldId, CalculatedFieldState> states = new HashMap<>();
@ -85,7 +86,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
this.entityId = entityId; this.entityId = entityId;
this.cfService = systemContext.getCalculatedFieldProcessingService(); this.cfService = systemContext.getCalculatedFieldProcessingService();
this.cfStateService = systemContext.getCalculatedFieldStateService(); this.cfStateService = systemContext.getCalculatedFieldStateService();
this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId);
} }
void init(TbActorCtx ctx) { void init(TbActorCtx ctx) {
@ -93,8 +93,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} }
public void process(CalculatedFieldPartitionChangeMsg msg) { public void process(CalculatedFieldPartitionChangeMsg msg) {
if (!msg.getPartitions()[partition]) { if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) {
log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId); log.info("[{}] Stopping entity actor due to change partition event.", entityId);
ctx.stop(ctx.getSelf()); ctx.stop(ctx.getSelf());
} }
} }
@ -153,7 +153,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} }
public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException { public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing CF telemetry msg.", msg.getEntityId()); log.debug("[{}] Processing CF telemetry msg.", msg.getEntityId());
var proto = msg.getProto(); var proto = msg.getProto();
var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size()); var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback()); MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback());
@ -168,7 +168,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} }
public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException { public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException {
log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId()); log.debug("[{}] Processing CF link telemetry msg.", msg.getEntityId());
var proto = msg.getProto(); var proto = msg.getProto();
var ctx = msg.getCtx(); var ctx = msg.getCtx();
var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback()); var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback());

View File

@ -318,14 +318,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) {
EntityId entityId = msg.getEntityId(); EntityId entityId = msg.getEntityId();
log.info("Received telemetry msg from entity [{}]", entityId); log.debug("Received telemetry msg from entity [{}]", entityId);
// 2 = 1 for CF processing + 1 for links processing // 2 = 1 for CF processing + 1 for links processing
MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback()); MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback());
// process all cfs related to entity, or it's profile; // process all cfs related to entity, or it's profile;
var entityIdFields = getCalculatedFieldsByEntityId(entityId); var entityIdFields = getCalculatedFieldsByEntityId(entityId);
var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId)); var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId));
if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) { if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) {
log.info("Pushing telemetry msg to specific actor [{}]", entityId); log.debug("Pushing telemetry msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback)); getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback));
} else { } else {
callback.onSuccess(); callback.onSuccess();
@ -342,7 +342,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) { public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) {
EntityId sourceEntityId = msg.getEntityId(); EntityId sourceEntityId = msg.getEntityId();
log.info("Received linked telemetry msg from entity [{}]", sourceEntityId); log.debug("Received linked telemetry msg from entity [{}]", sourceEntityId);
var proto = msg.getProto(); var proto = msg.getProto();
var linksList = proto.getLinksList(); var linksList = proto.getLinksList();
for (var linkProto : linksList) { for (var linkProto : linksList) {
@ -357,14 +357,14 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback); var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
entityIds.forEach(entityId -> { entityIds.forEach(entityId -> {
log.info("Pushing linked telemetry msg to specific actor [{}]", entityId); log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(newMsg); getOrCreateActor(entityId).tell(newMsg);
}); });
} else { } else {
msg.getCallback().onSuccess(); msg.getCallback().onSuccess();
} }
} else { } else {
log.info("Pushing linked telemetry msg to specific actor [{}]", targetEntityId); log.debug("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback()); var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback());
getOrCreateActor(targetEntityId).tell(newMsg); getOrCreateActor(targetEntityId).tell(newMsg);
} }

View File

@ -91,6 +91,15 @@ public class TenantActor extends RuleChainManagerActor {
isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
if (isRuleEngine) { if (isRuleEngine) {
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
try {
//TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
() -> true);
} catch (Exception e) {
log.info("[{}] Failed to init CF Actor.", tenantId, e);
}
try { try {
if (getApiUsageState().isReExecEnabled()) { if (getApiUsageState().isReExecEnabled()) {
log.debug("[{}] Going to init rule chains", tenantId); log.debug("[{}] Going to init rule chains", tenantId);
@ -98,11 +107,6 @@ public class TenantActor extends RuleChainManagerActor {
} else { } else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId); log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
} }
//TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
() -> true);
} catch (Exception e) { } catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true; cantFindTenant = true;
@ -185,6 +189,10 @@ public class TenantActor extends RuleChainManagerActor {
} }
private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) { private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) {
if (cfActor == null) {
log.warn("[{}] CF Actor is not initialized.", tenantId);
return;
}
if (priority) { if (priority) {
cfActor.tellWithHighPriority(msg); cfActor.tellWithHighPriority(msg);
} else { } else {
@ -251,11 +259,25 @@ public class TenantActor extends RuleChainManagerActor {
ServiceType serviceType = msg.getServiceType(); ServiceType serviceType = msg.getServiceType();
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
if (cfActor == null) {
try {
//TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
() -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
() -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
() -> true);
} catch (Exception e) {
log.info("[{}] Failed to init CF Actor.", tenantId, e);
}
}
if (!ruleChainsInitialized) { if (!ruleChainsInitialized) {
log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId); log.info("Tenant {} is now managed by this service, initializing rule chains", tenantId);
initRuleChains(); initRuleChains();
} }
} else { } else {
if (cfActor != null) {
ctx.stop(cfActor.getActorId());
}
if (ruleChainsInitialized) { if (ruleChainsInitialized) {
log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId); log.info("Tenant {} is no longer managed by this service, stopping rule chains", tenantId);
destroyRuleChains(); destroyRuleChains();

View File

@ -30,6 +30,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.Argument;
@ -51,6 +52,7 @@ import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.attributes.AttributesService;
@ -200,7 +202,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
if (broadcast) { if (broadcast) {
broadcasts.add(link); broadcasts.add(link);
} else { } else {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, link.entityId()); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, link.tenantId(), link.entityId());
unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link); unicasts.computeIfAbsent(tpi, k -> new ArrayList<>()).add(link);
} }
} }

View File

@ -18,8 +18,10 @@ package org.thingsboard.server.service.cf.cache;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.QueueKey;
@ -57,7 +59,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override @Override
public void add(TenantId tenantId, EntityId profileId, EntityId entityId) { public void add(TenantId tenantId, EntityId profileId, EntityId entityId) {
var tpi = partitionService.resolve(QueueKey.CF, entityId); var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
var partition = tpi.getPartition().orElse(UNKNOWN); var partition = tpi.getPartition().orElse(UNKNOWN);
tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()) tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache())
.add(profileId, entityId, partition, tpi.isMyPartition()); .add(profileId, entityId, partition, tpi.isMyPartition());
@ -65,7 +67,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override @Override
public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) { public void update(TenantId tenantId, EntityId oldProfileId, EntityId newProfileId, EntityId entityId) {
var tpi = partitionService.resolve(QueueKey.CF, entityId); var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
var partition = tpi.getPartition().orElse(UNKNOWN); var partition = tpi.getPartition().orElse(UNKNOWN);
var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache()); var cache = tenantCache.computeIfAbsent(tenantId, id -> new TenantEntityProfileCache());
//TODO: make this method atomic; //TODO: make this method atomic;
@ -86,7 +88,7 @@ public class DefaultCalculatedFieldEntityProfileCache extends TbApplicationEvent
@Override @Override
public int getEntityIdPartition(TenantId tenantId, EntityId entityId) { public int getEntityIdPartition(TenantId tenantId, EntityId entityId) {
var tpi = partitionService.resolve(QueueKey.CF, entityId); var tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
return tpi.getPartition().orElse(UNKNOWN); return tpi.getPartition().orElse(UNKNOWN);
} }

View File

@ -34,7 +34,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.usagerecord.ApiLimitService; import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
@ -44,7 +43,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
@Data @Data
public class CalculatedFieldCtx { public class CalculatedFieldCtx {
@ -58,8 +56,6 @@ public class CalculatedFieldCtx {
private final Map<String, Argument> arguments; private final Map<String, Argument> arguments;
private final Map<ReferencedEntityKey, String> mainEntityArguments; private final Map<ReferencedEntityKey, String> mainEntityArguments;
private final Map<EntityId, Map<ReferencedEntityKey, String>> linkedEntityArguments; private final Map<EntityId, Map<ReferencedEntityKey, String>> linkedEntityArguments;
private final Map<TbPair<EntityId, ReferencedEntityKey>, String> referencedEntityKeys;
private final List<String> argNames; private final List<String> argNames;
private Output output; private Output output;
private String expression; private String expression;
@ -93,11 +89,6 @@ public class CalculatedFieldCtx {
linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey()); linkedEntityArguments.computeIfAbsent(refId, key -> new HashMap<>()).put(refKey, entry.getKey());
} }
} }
this.referencedEntityKeys = arguments.entrySet().stream()
.collect(Collectors.toMap(
entry -> new TbPair<>(entry.getValue().getRefEntityId() == null ? entityId : entry.getValue().getRefEntityId(), entry.getValue().getRefEntityKey()),
Map.Entry::getKey
));
this.argNames = new ArrayList<>(arguments.keySet()); this.argNames = new ArrayList<>(arguments.keySet());
this.output = configuration.getOutput(); this.output = configuration.getOutput();
this.expression = configuration.getExpression(); this.expression = configuration.getExpression();

View File

@ -20,10 +20,12 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
@ -67,9 +69,11 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override @Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) { public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToCalculatedFieldMsg>> eventConsumer) {
super.init(eventConsumer); super.init(eventConsumer);
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME);
this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create() this.stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<CalculatedFieldStateProto>>create()
.queueKey(QueueKey.CF_STATES) .queueKey(queueKey)
.topic(partitionService.getTopic(QueueKey.CF_STATES)) .topic(partitionService.getTopic(queueKey))
.pollInterval(pollInterval) .pollInterval(pollInterval)
.msgPackProcessor((msgs, consumer, config) -> { .msgPackProcessor((msgs, consumer, config) -> {
for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) { for (TbProtoQueueMsg<CalculatedFieldStateProto> msg : msgs) {
@ -101,7 +105,7 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta
@Override @Override
protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) { protected void doPersist(CalculatedFieldEntityCtxId stateId, CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF_STATES, stateId.entityId()); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_STATES_QUEUE_NAME, stateId.tenantId(), stateId.entityId());
TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto); TbProtoQueueMsg<CalculatedFieldStateProto> msg = new TbProtoQueueMsg<>(stateId.entityId().getId(), stateMsgProto);
if (stateMsgProto == null) { if (stateMsgProto == null) {
putStateId(msg.getHeaders(), stateId); putStateId(msg.getHeaders(), stateId);

View File

@ -27,6 +27,7 @@ import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldLinkedTelemetryMsg;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.QueueConfig; import org.thingsboard.server.common.data.queue.QueueConfig;
@ -79,8 +80,6 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
private long pollInterval; private long pollInterval;
@Value("${queue.calculated_fields.pack_processing_timeout:60000}") @Value("${queue.calculated_fields.pack_processing_timeout:60000}")
private long packProcessingTimeout; private long packProcessingTimeout;
@Value("${queue.calculated_fields.pool_size:8}")
private int poolSize;
private final TbRuleEngineQueueFactory queueFactory; private final TbRuleEngineQueueFactory queueFactory;
private final CalculatedFieldStateService stateService; private final CalculatedFieldStateService stateService;
@ -108,9 +107,10 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
public void init() { public void init() {
super.init("tb-cf"); super.init("tb-cf");
var queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create() this.eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToCalculatedFieldMsg>>create()
.queueKey(QueueKey.CF) .queueKey(queueKey)
.topic(partitionService.getTopic(QueueKey.CF)) .topic(partitionService.getTopic(queueKey))
.pollInterval(pollInterval) .pollInterval(pollInterval)
.msgPackProcessor(this::processMsgs) .msgPackProcessor(this::processMsgs)
.consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer()) .consumerCreator((config, partitionId) -> queueFactory.createToCalculatedFieldMsgConsumer())
@ -140,20 +140,12 @@ public class DefaultTbCalculatedFieldConsumerService extends AbstractConsumerSer
// Cleanup old entities after corresponding consumers are stopped. // Cleanup old entities after corresponding consumers are stopped.
// Any periodic tasks need to check that the entity is still managed by the current server before processing. // Any periodic tasks need to check that the entity is still managed by the current server before processing.
actorContext.tell(new CalculatedFieldPartitionChangeMsg(partitionsToBooleanIndexArray(partitions))); actorContext.tell(new CalculatedFieldPartitionChangeMsg());
} catch (Throwable t) { } catch (Throwable t) {
log.error("Failed to process partition change event: {}", event, t); log.error("Failed to process partition change event: {}", event, t);
} }
} }
private boolean[] partitionsToBooleanIndexArray(Set<TopicPartitionInfo> partitions) {
boolean[] myPartitions = new boolean[partitionService.getTotalCalculatedFieldPartitions()];
for (var tpi : partitions) {
tpi.getPartition().ifPresent(partition -> myPartitions[partition] = true);
}
return myPartitions;
}
private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception { private void processMsgs(List<TbProtoQueueMsg<ToCalculatedFieldMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToCalculatedFieldMsg>> consumer, QueueConfig config) throws Exception {
List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList(); List<IdMsgPair<ToCalculatedFieldMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).toList();
ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect( ConcurrentMap<UUID, TbProtoQueueMsg<ToCalculatedFieldMsg>> pendingMap = orderedMsgList.stream().collect(

View File

@ -358,7 +358,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Override @Override
public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) { public void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback); pushMsgToCalculatedFields(tpi, UUID.randomUUID(), msg, callback);
} }
@ -371,7 +371,7 @@ public class DefaultTbClusterService implements TbClusterService {
@Override @Override
public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) { public void pushNotificationToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldNotificationMsg msg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(QueueKey.CF, entityId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId);
producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); producerProvider.getCalculatedFieldsNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback);
toRuleEngineNfs.incrementAndGet(); toRuleEngineNfs.incrementAndGet();
} }

View File

@ -22,6 +22,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -108,7 +109,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent event) { protected void onTbApplicationEvent(PartitionChangeEvent event) {
event.getNewPartitions().forEach((queueKey, partitions) -> { event.getNewPartitions().forEach((queueKey, partitions) -> {
if (CollectionsUtil.isOneOf(queueKey, QueueKey.CF, QueueKey.CF_STATES)) { if (DataConstants.CF_QUEUE_NAME.equals(queueKey.getQueueName()) || DataConstants.CF_STATES_QUEUE_NAME.equals(queueKey.getQueueName())) {
return; return;
} }
if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) { if (partitionService.isManagedByCurrentService(queueKey.getTenantId())) {

View File

@ -1757,7 +1757,6 @@ queue:
stats: stats:
# Enable/disable statistics for EDQS # Enable/disable statistics for EDQS
enabled: "${TB_EDQS_STATS_ENABLED:true}" enabled: "${TB_EDQS_STATS_ENABLED:true}"
vc: vc:
# Default topic name # Default topic name
topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}" topic: "${TB_QUEUE_VC_TOPIC:tb_version_control}"
@ -1814,9 +1813,7 @@ queue:
# For high-priority notifications that require minimum latency and processing time # For high-priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}" notifications_topic: "${TB_QUEUE_CF_NOTIFICATIONS_TOPIC:calculated_field.notifications}"
# Interval in milliseconds to poll messages by CF (Rule Engine) microservices # Interval in milliseconds to poll messages by CF (Rule Engine) microservices
poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:25}" poll_interval: "${TB_QUEUE_CF_POLL_INTERVAL_MS:1000}"
# Amount of partitions used by CF microservices
partitions: "${TB_QUEUE_CF_PARTITIONS:10}"
# Timeout for processing a message pack by CF microservices # Timeout for processing a message pack by CF microservices
pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}" pack_processing_timeout: "${TB_QUEUE_CF_PACK_PROCESSING_TIMEOUT_MS:60000}"
# Thread pool size for processing of the incoming messages # Thread pool size for processing of the incoming messages

View File

@ -26,8 +26,6 @@ import java.util.Set;
@Data @Data
public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg { public class CalculatedFieldPartitionChangeMsg implements ToCalculatedFieldSystemMsg {
private final boolean[] partitions;
@Override @Override
public TenantId getTenantId() { public TenantId getTenantId() {
return TenantId.SYS_TENANT_ID; return TenantId.SYS_TENANT_ID;

View File

@ -24,6 +24,7 @@ import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -67,8 +68,6 @@ public class HashPartitionService implements PartitionService {
private String cfEventTopic; private String cfEventTopic;
@Value("${queue.calculated_fields.state_topic:tb_cf_state}") @Value("${queue.calculated_fields.state_topic:tb_cf_state}")
private String cfStateTopic; private String cfStateTopic;
@Value("${queue.calculated_fields.partitions:10}")
private Integer cfPartitions;
@Value("${queue.vc.topic:tb_version_control}") @Value("${queue.vc.topic:tb_version_control}")
private String vcTopic; private String vcTopic;
@Value("${queue.vc.partitions:10}") @Value("${queue.vc.partitions:10}")
@ -122,11 +121,6 @@ public class HashPartitionService implements PartitionService {
partitionSizesMap.put(coreKey, corePartitions); partitionSizesMap.put(coreKey, corePartitions);
partitionTopicsMap.put(coreKey, coreTopic); partitionTopicsMap.put(coreKey, coreTopic);
partitionSizesMap.put(QueueKey.CF, cfPartitions);
partitionTopicsMap.put(QueueKey.CF, cfEventTopic);
partitionSizesMap.put(QueueKey.CF_STATES, cfPartitions);
partitionTopicsMap.put(QueueKey.CF_STATES, cfStateTopic);
QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR); QueueKey vcKey = new QueueKey(ServiceType.TB_VC_EXECUTOR);
partitionSizesMap.put(vcKey, vcPartitions); partitionSizesMap.put(vcKey, vcPartitions);
partitionTopicsMap.put(vcKey, vcTopic); partitionTopicsMap.put(vcKey, vcTopic);
@ -165,6 +159,14 @@ public class HashPartitionService implements PartitionService {
List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos(); List<QueueRoutingInfo> queueRoutingInfoList = getQueueRoutingInfos();
queueRoutingInfoList.forEach(queue -> { queueRoutingInfoList.forEach(queue -> {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.put(cfQueueKey, queue.getPartitions());
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.put(cfQueueStatesKey, queue.getPartitions());
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
}
partitionTopicsMap.put(queueKey, queue.getQueueTopic()); partitionTopicsMap.put(queueKey, queue.getQueueTopic());
partitionSizesMap.put(queueKey, queue.getPartitions()); partitionSizesMap.put(queueKey, queue.getPartitions());
queueConfigs.put(queueKey, new QueueConfig(queue)); queueConfigs.put(queueKey, new QueueConfig(queue));
@ -213,6 +215,14 @@ public class HashPartitionService implements PartitionService {
QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg); QueueRoutingInfo queueRoutingInfo = new QueueRoutingInfo(queueUpdateMsg);
TenantId tenantId = queueRoutingInfo.getTenantId(); TenantId tenantId = queueRoutingInfo.getTenantId();
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueRoutingInfo.getQueueName(), tenantId);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.put(cfQueueKey, queueRoutingInfo.getPartitions());
partitionTopicsMap.put(cfQueueKey, cfEventTopic);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.put(cfQueueStatesKey, queueRoutingInfo.getPartitions());
partitionTopicsMap.put(cfQueueStatesKey, cfStateTopic);
}
partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic()); partitionTopicsMap.put(queueKey, queueRoutingInfo.getQueueTopic());
partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions()); partitionSizesMap.put(queueKey, queueRoutingInfo.getPartitions());
queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo)); queueConfigs.put(queueKey, new QueueConfig(queueRoutingInfo));
@ -252,6 +262,15 @@ public class HashPartitionService implements PartitionService {
partitionTopicsMap.remove(queueKey); partitionTopicsMap.remove(queueKey);
partitionSizesMap.remove(queueKey); partitionSizesMap.remove(queueKey);
queueConfigs.remove(queueKey); queueConfigs.remove(queueKey);
if (DataConstants.MAIN_QUEUE_NAME.equals(queueKey.getQueueName())) {
QueueKey cfQueueKey = queueKey.withQueueName(DataConstants.CF_QUEUE_NAME);
partitionSizesMap.remove(cfQueueKey);
partitionTopicsMap.remove(cfQueueKey);
QueueKey cfQueueStatesKey = queueKey.withQueueName(DataConstants.CF_STATES_QUEUE_NAME);
partitionSizesMap.remove(cfQueueStatesKey);
partitionTopicsMap.remove(cfQueueStatesKey);
}
} }
@Override @Override
@ -336,8 +355,7 @@ public class HashPartitionService implements PartitionService {
} }
} }
@Override private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
public TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
Integer partitionSize = partitionSizesMap.get(queueKey); Integer partitionSize = partitionSizesMap.get(queueKey);
if (partitionSize == null) { if (partitionSize == null) {
throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing"); throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing");
@ -552,11 +570,6 @@ public class HashPartitionService implements PartitionService {
return list == null ? 0 : list.size(); return list == null ? 0 : list.size();
} }
@Override
public int getTotalCalculatedFieldPartitions() {
return cfPartitions;
}
private Map<QueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) { private Map<QueueKey, List<ServiceInfo>> getServiceKeyListMap(List<ServiceInfo> services) {
final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>(); final Map<QueueKey, List<ServiceInfo>> currentMap = new HashMap<>();
services.forEach(serviceInfo -> { services.forEach(serviceInfo -> {

View File

@ -37,8 +37,6 @@ public interface PartitionService {
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId); TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId);
TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId);
List<TopicPartitionInfo> resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId); List<TopicPartitionInfo> resolveAll(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId);
boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId); boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId);
@ -81,6 +79,4 @@ public interface PartitionService {
int resolvePartitionIndex(UUID entityId, int partitions); int resolvePartitionIndex(UUID entityId, int partitions);
int getTotalCalculatedFieldPartitions();
} }

View File

@ -35,9 +35,6 @@ public class QueueKey {
private final String queueName; private final String queueName;
private final TenantId tenantId; private final TenantId tenantId;
public static final QueueKey CF = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_QUEUE_NAME);
public static final QueueKey CF_STATES = new QueueKey(ServiceType.TB_RULE_ENGINE).withQueueName(CF_STATES_QUEUE_NAME);
public QueueKey(ServiceType type, Queue queue) { public QueueKey(ServiceType type, Queue queue) {
this.type = type; this.type = type;
this.queueName = queue.getName(); this.queueName = queue.getName();

View File

@ -64,10 +64,10 @@ public class PartitionChangeEvent extends TbApplicationEvent {
} }
public Set<TopicPartitionInfo> getCfPartitions() { public Set<TopicPartitionInfo> getCfPartitions() {
return newPartitions.getOrDefault(QueueKey.CF, Collections.emptySet()); return getPartitionsByServiceTypeAndQueueName(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME);
} }
private Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) { public Set<TopicPartitionInfo> getPartitionsByServiceTypeAndQueueName(ServiceType serviceType, String queueName) {
return newPartitions.entrySet() return newPartitions.entrySet()
.stream() .stream()
.filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName())) .filter(entry -> serviceType.equals(entry.getKey().getType()) && queueName.equals(entry.getKey().getQueueName()))

View File

@ -18,8 +18,6 @@ package org.thingsboard.script.api.tbel;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
import org.mvel2.ConversionHandler;
import org.mvel2.DataConversion;
import org.mvel2.ExecutionContext; import org.mvel2.ExecutionContext;
import org.mvel2.ParserConfiguration; import org.mvel2.ParserConfiguration;
import org.mvel2.execution.ExecutionArrayList; import org.mvel2.execution.ExecutionArrayList;
@ -259,6 +257,8 @@ public class TbUtils {
float.class, int.class))); float.class, int.class)));
parserConfig.addImport("toInt", new MethodStub(TbUtils.class.getMethod("toInt", parserConfig.addImport("toInt", new MethodStub(TbUtils.class.getMethod("toInt",
double.class))); double.class)));
parserConfig.addImport("isNaN", new MethodStub(TbUtils.class.getMethod("isNaN",
double.class)));
parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes", parserConfig.addImport("hexToBytes", new MethodStub(TbUtils.class.getMethod("hexToBytes",
ExecutionContext.class, String.class))); ExecutionContext.class, String.class)));
parserConfig.addImport("hexToBytesArray", new MethodStub(TbUtils.class.getMethod("hexToBytesArray", parserConfig.addImport("hexToBytesArray", new MethodStub(TbUtils.class.getMethod("hexToBytesArray",
@ -1163,6 +1163,10 @@ public class TbUtils {
return BigDecimal.valueOf(value).setScale(0, RoundingMode.HALF_UP).intValue(); return BigDecimal.valueOf(value).setScale(0, RoundingMode.HALF_UP).intValue();
} }
public static boolean isNaN(double value) {
return Double.isNaN(value);
}
public static ExecutionHashMap<String, Object> toFlatMap(ExecutionContext ctx, Map<String, Object> json) { public static ExecutionHashMap<String, Object> toFlatMap(ExecutionContext ctx, Map<String, Object> json) {
return toFlatMap(ctx, json, new ArrayList<>(), true); return toFlatMap(ctx, json, new ArrayList<>(), true);
} }

View File

@ -1138,12 +1138,18 @@ public class TbUtilsTest {
} }
@Test @Test
void toInt() { public void toInt() {
Assertions.assertEquals(1729, TbUtils.toInt(doubleVal)); Assertions.assertEquals(1729, TbUtils.toInt(doubleVal));
Assertions.assertEquals(13, TbUtils.toInt(12.8)); Assertions.assertEquals(13, TbUtils.toInt(12.8));
Assertions.assertEquals(28, TbUtils.toInt(28.0)); Assertions.assertEquals(28, TbUtils.toInt(28.0));
} }
@Test
public void isNaN() {
Assertions.assertFalse(TbUtils.isNaN(doubleVal));
Assertions.assertTrue(TbUtils.isNaN(Double.NaN));
}
private static List<Byte> toList(byte[] data) { private static List<Byte> toList(byte[] data) {
List<Byte> result = new ArrayList<>(data.length); List<Byte> result = new ArrayList<>(data.length);
for (Byte b : data) { for (Byte b : data) {

View File

@ -19,10 +19,10 @@ version: '3.0'
services: services:
tb-edqs-1: tb-edqs-1:
volumes: volumes:
- tb-edqs-log-volume:/var/log/edqs - tb-edqs-log-volume:/var/log/tb-edqs
tb-edqs-2: tb-edqs-2:
volumes: volumes:
- tb-edqs-log-volume:/var/log/edqs - tb-edqs-log-volume:/var/log/tb-edqs
volumes: volumes:
tb-edqs-log-volume: tb-edqs-log-volume:

View File

@ -33,10 +33,10 @@ services:
restart: always restart: always
image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}" image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}"
env_file: env_file:
- edqs.env - tb-edqs.env
volumes: volumes:
- ./edqs/conf:/usr/share/edqs/conf - ./tb-edqs/conf:/usr/share/tb-edqs/conf
- ./edqs/log:/var/log/edqs - ./tb-edqs/log:/var/log/tb-edqs
ports: ports:
- "8080" - "8080"
depends_on: depends_on:
@ -46,10 +46,10 @@ services:
restart: always restart: always
image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}" image: "${DOCKER_REPO}/${EDQS_DOCKER_NAME}:${TB_VERSION}"
env_file: env_file:
- edqs.env - tb-edqs.env
volumes: volumes:
- ./edqs/conf:/usr/share/edqs/conf - ./tb-edqs/conf:/usr/share/tb-edqs/conf
- ./edqs/log:/var/log/edqs - ./tb-edqs/log:/var/log/tb-edqs
ports: ports:
- "8080" - "8080"
depends_on: depends_on:

View File

@ -21,10 +21,10 @@
<appender name="fileLogAppender" <appender name="fileLogAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender"> class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/var/log/edqs/${TB_SERVICE_ID}/tb-edqs.log</file> <file>/var/log/tb-edqs/${TB_SERVICE_ID}/tb-edqs.log</file>
<rollingPolicy <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>/var/log/edqs/tb-edqs.%d{yyyy-MM-dd}.%i.log</fileNamePattern> <fileNamePattern>/var/log/tb-edqs/tb-edqs.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize> <maxFileSize>100MB</maxFileSize>
<maxHistory>30</maxHistory> <maxHistory>30</maxHistory>
<totalSizeCap>3GB</totalSizeCap> <totalSizeCap>3GB</totalSizeCap>

View File

@ -14,9 +14,9 @@
# limitations under the License. # limitations under the License.
# #
export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/edqs/${TB_SERVICE_ID}-gc.log:time,uptime,level,tags:filecount=10,filesize=10M" export JAVA_OPTS="$JAVA_OPTS -Xlog:gc*,heap*,age*,safepoint=debug:file=/var/log/tb-edqs/${TB_SERVICE_ID}-gc.log:time,uptime,level,tags:filecount=10,filesize=10M"
export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError" export JAVA_OPTS="$JAVA_OPTS -XX:+IgnoreUnrecognizedVMOptions -XX:+HeapDumpOnOutOfMemoryError"
export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark" export JAVA_OPTS="$JAVA_OPTS -XX:-UseBiasedLocking -XX:+UseTLAB -XX:+ResizeTLAB -XX:+PerfDisableSharedMem -XX:+UseCondCardMark"
export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10" export JAVA_OPTS="$JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:+UseStringDeduplication -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10"
export LOG_FILENAME=tb-edqs.out export LOG_FILENAME=tb-edqs.out
export LOADER_PATH=/usr/share/edqs/conf export LOADER_PATH=/usr/share/tb-edqs/conf