Merge pull request #1807 from thingsboard/feature/period-in-seconds-pattern-alarm-originator
Added alarm originator source; Added period in seconds use metadata p…
This commit is contained in:
commit
12b2aa0086
@ -16,6 +16,7 @@
|
||||
package org.thingsboard.rule.engine.delay;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
@ -51,13 +52,11 @@ public class TbMsgDelayNode implements TbNode {
|
||||
private static final String TB_MSG_DELAY_NODE_MSG = "TbMsgDelayNodeMsg";
|
||||
|
||||
private TbMsgDelayNodeConfiguration config;
|
||||
private long delay;
|
||||
private Map<UUID, TbMsg> pendingMsgs;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
this.config = TbNodeUtils.convert(configuration, TbMsgDelayNodeConfiguration.class);
|
||||
this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds());
|
||||
this.pendingMsgs = new HashMap<>();
|
||||
}
|
||||
|
||||
@ -72,13 +71,31 @@ public class TbMsgDelayNode implements TbNode {
|
||||
if(pendingMsgs.size() < config.getMaxPendingMsgs()) {
|
||||
pendingMsgs.put(msg.getId(), msg);
|
||||
TbMsg tickMsg = ctx.newMsg(TB_MSG_DELAY_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), msg.getId().toString());
|
||||
ctx.tellSelf(tickMsg, delay);
|
||||
ctx.tellSelf(tickMsg, getDelay(msg));
|
||||
} else {
|
||||
ctx.tellNext(msg, FAILURE, new RuntimeException("Max limit of pending messages reached!"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private long getDelay(TbMsg msg) {
|
||||
int periodInSeconds;
|
||||
if (config.isUseMetadataPeriodInSecondsPatterns()) {
|
||||
if (isParsable(msg, config.getPeriodInSecondsPattern())) {
|
||||
periodInSeconds = Integer.parseInt(TbNodeUtils.processPattern(config.getPeriodInSecondsPattern(), msg.getMetaData()));
|
||||
} else {
|
||||
throw new RuntimeException("Can't parse period in seconds from metadata using pattern: " + config.getPeriodInSecondsPattern());
|
||||
}
|
||||
} else {
|
||||
periodInSeconds = config.getPeriodInSeconds();
|
||||
}
|
||||
return TimeUnit.SECONDS.toMillis(periodInSeconds);
|
||||
}
|
||||
|
||||
private boolean isParsable(TbMsg msg, String pattern) {
|
||||
return NumberUtils.isParsable(TbNodeUtils.processPattern(pattern, msg.getMetaData()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
pendingMsgs.clear();
|
||||
|
||||
@ -24,12 +24,15 @@ public class TbMsgDelayNodeConfiguration implements NodeConfiguration<TbMsgDelay
|
||||
|
||||
private int periodInSeconds;
|
||||
private int maxPendingMsgs;
|
||||
private String periodInSecondsPattern;
|
||||
private boolean useMetadataPeriodInSecondsPatterns;
|
||||
|
||||
@Override
|
||||
public TbMsgDelayNodeConfiguration defaultConfiguration() {
|
||||
TbMsgDelayNodeConfiguration configuration = new TbMsgDelayNodeConfiguration();
|
||||
configuration.setPeriodInSeconds(60);
|
||||
configuration.setMaxPendingMsgs(1000);
|
||||
configuration.setUseMetadataPeriodInSecondsPatterns(false);
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.util.EntitiesAlarmOriginatorIdAsyncLoader;
|
||||
import org.thingsboard.rule.engine.util.EntitiesCustomerIdAsyncLoader;
|
||||
import org.thingsboard.rule.engine.util.EntitiesRelatedEntityIdAsyncLoader;
|
||||
import org.thingsboard.rule.engine.util.EntitiesTenantIdAsyncLoader;
|
||||
@ -39,9 +40,10 @@ import java.util.HashSet;
|
||||
type = ComponentType.TRANSFORMATION,
|
||||
name = "change originator",
|
||||
configClazz = TbChangeOriginatorNodeConfiguration.class,
|
||||
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity",
|
||||
nodeDescription = "Change Message Originator To Tenant/Customer/Related Entity/Alarm Originator",
|
||||
nodeDetails = "Related Entity found using configured relation direction and Relation Type. " +
|
||||
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded. ",
|
||||
"If multiple Related Entities are found, only first Entity is used as new Originator, other entities are discarded.<br/>" +
|
||||
"Alarm Originator found only in case original Originator is <code>Alarm</code> entity.",
|
||||
uiResources = {"static/rulenode/rulenode-core-config.js", "static/rulenode/rulenode-core-config.css"},
|
||||
configDirective = "tbTransformationNodeChangeOriginatorConfig",
|
||||
icon = "find_replace"
|
||||
@ -51,6 +53,7 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
|
||||
protected static final String CUSTOMER_SOURCE = "CUSTOMER";
|
||||
protected static final String TENANT_SOURCE = "TENANT";
|
||||
protected static final String RELATED_SOURCE = "RELATED";
|
||||
protected static final String ALARM_ORIGINATOR_SOURCE = "ALARM_ORIGINATOR";
|
||||
|
||||
private TbChangeOriginatorNodeConfiguration config;
|
||||
|
||||
@ -80,13 +83,15 @@ public class TbChangeOriginatorNode extends TbAbstractTransformNode {
|
||||
return EntitiesTenantIdAsyncLoader.findEntityIdAsync(ctx, original);
|
||||
case RELATED_SOURCE:
|
||||
return EntitiesRelatedEntityIdAsyncLoader.findEntityAsync(ctx, original, config.getRelationsQuery());
|
||||
case ALARM_ORIGINATOR_SOURCE:
|
||||
return EntitiesAlarmOriginatorIdAsyncLoader.findEntityIdAsync(ctx, original);
|
||||
default:
|
||||
return Futures.immediateFailedFuture(new IllegalStateException("Unexpected originator source " + config.getOriginatorSource()));
|
||||
}
|
||||
}
|
||||
|
||||
private void validateConfig(TbChangeOriginatorNodeConfiguration conf) {
|
||||
HashSet<String> knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE);
|
||||
HashSet<String> knownSources = Sets.newHashSet(CUSTOMER_SOURCE, TENANT_SOURCE, RELATED_SOURCE, ALARM_ORIGINATOR_SOURCE);
|
||||
if (!knownSources.contains(conf.getOriginatorSource())) {
|
||||
log.error("Unsupported source [{}] for TbChangeOriginatorNode", conf.getOriginatorSource());
|
||||
throw new IllegalArgumentException("Unsupported source TbChangeOriginatorNode" + conf.getOriginatorSource());
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Copyright © 2016-2019 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.util;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
|
||||
public class EntitiesAlarmOriginatorIdAsyncLoader {
|
||||
|
||||
public static ListenableFuture<EntityId> findEntityIdAsync(TbContext ctx, EntityId original) {
|
||||
|
||||
switch (original.getEntityType()) {
|
||||
case ALARM:
|
||||
return getAlarmOriginatorAsync(ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), (AlarmId) original));
|
||||
default:
|
||||
return Futures.immediateFailedFuture(new TbNodeException("Unexpected original EntityType " + original));
|
||||
}
|
||||
}
|
||||
|
||||
private static ListenableFuture<EntityId> getAlarmOriginatorAsync(ListenableFuture<Alarm> future) {
|
||||
return Futures.transformAsync(future, in -> {
|
||||
return in != null ? Futures.immediateFuture(in.getOriginator())
|
||||
: Futures.immediateFuture(null);
|
||||
});
|
||||
}
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user