added tbel metrics, refactored JS stats
This commit is contained in:
parent
f21275b4bd
commit
0cc55ab989
@ -28,7 +28,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.MailService;
|
||||
@ -399,10 +398,6 @@ public class ActorSystemContext {
|
||||
@Getter
|
||||
private ClaimDevicesService claimDevicesService;
|
||||
|
||||
@Autowired
|
||||
@Getter
|
||||
private JsInvokeStats jsInvokeStats;
|
||||
|
||||
//TODO: separate context for TbCore and TbRuleEngine
|
||||
@Autowired(required = false)
|
||||
@Getter
|
||||
@ -527,17 +522,6 @@ public class ActorSystemContext {
|
||||
this.localCacheType = "caffeine".equals(cacheType);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
|
||||
public void printStats() {
|
||||
if (statisticsEnabled) {
|
||||
if (jsInvokeStats.getRequests() > 0 || jsInvokeStats.getResponses() > 0 || jsInvokeStats.getFailures() > 0) {
|
||||
log.info("Rule Engine JS Invoke Stats: requests [{}] responses [{}] failures [{}]",
|
||||
jsInvokeStats.getRequests(), jsInvokeStats.getResponses(), jsInvokeStats.getFailures());
|
||||
jsInvokeStats.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Value("${actors.tenant.create_components_on_init:true}")
|
||||
@Getter
|
||||
private boolean tenantComponentsInitEnabled;
|
||||
|
||||
@ -608,27 +608,6 @@ public class DefaultTbContext implements TbContext {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logJsEvalRequest() {
|
||||
if (mainCtx.isStatisticsEnabled()) {
|
||||
mainCtx.getJsInvokeStats().incrementRequests();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logJsEvalResponse() {
|
||||
if (mainCtx.isStatisticsEnabled()) {
|
||||
mainCtx.getJsInvokeStats().incrementResponses();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void logJsEvalFailure() {
|
||||
if (mainCtx.isStatisticsEnabled()) {
|
||||
mainCtx.getJsInvokeStats().incrementFailures();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServiceId() {
|
||||
return mainCtx.getServiceInfoProvider().getServiceId();
|
||||
|
||||
@ -1,83 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.stats;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.actors.JsInvokeStats;
|
||||
import org.thingsboard.server.common.stats.StatsCounter;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
|
||||
@Service
|
||||
public class DefaultJsInvokeStats implements JsInvokeStats {
|
||||
private static final String REQUESTS = "requests";
|
||||
private static final String RESPONSES = "responses";
|
||||
private static final String FAILURES = "failures";
|
||||
|
||||
private StatsCounter requestsCounter;
|
||||
private StatsCounter responsesCounter;
|
||||
private StatsCounter failuresCounter;
|
||||
|
||||
@Autowired
|
||||
private StatsFactory statsFactory;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
String key = StatsType.JS_INVOKE.getName();
|
||||
this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS);
|
||||
this.responsesCounter = statsFactory.createStatsCounter(key, RESPONSES);
|
||||
this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementRequests(int amount) {
|
||||
requestsCounter.add(amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementResponses(int amount) {
|
||||
responsesCounter.add(amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementFailures(int amount) {
|
||||
failuresCounter.add(amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRequests() {
|
||||
return requestsCounter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getResponses() {
|
||||
return responsesCounter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFailures() {
|
||||
return failuresCounter.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
requestsCounter.clear();
|
||||
responsesCounter.clear();
|
||||
failuresCounter.clear();
|
||||
}
|
||||
}
|
||||
@ -500,8 +500,6 @@ actors:
|
||||
statistics:
|
||||
# Enable/disable actor statistics
|
||||
enabled: "${ACTORS_STATISTICS_ENABLED:true}"
|
||||
# Frequency of printing the JS executor statistics
|
||||
js_print_interval_ms: "${ACTORS_JS_STATISTICS_PRINT_INTERVAL_MS:10000}"
|
||||
# Actors statistic persistence frequency in milliseconds
|
||||
persist_frequency: "${ACTORS_STATISTICS_PERSIST_FREQUENCY:3600000}"
|
||||
|
||||
|
||||
@ -1,44 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2024 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.actors;
|
||||
|
||||
public interface JsInvokeStats {
|
||||
default void incrementRequests() {
|
||||
incrementRequests(1);
|
||||
}
|
||||
|
||||
void incrementRequests(int amount);
|
||||
|
||||
default void incrementResponses() {
|
||||
incrementResponses(1);
|
||||
}
|
||||
|
||||
void incrementResponses(int amount);
|
||||
|
||||
default void incrementFailures() {
|
||||
incrementFailures(1);
|
||||
}
|
||||
|
||||
void incrementFailures(int amount);
|
||||
|
||||
int getRequests();
|
||||
|
||||
int getResponses();
|
||||
|
||||
int getFailures();
|
||||
|
||||
void reset();
|
||||
}
|
||||
@ -20,10 +20,14 @@ import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.stats.StatsCounter;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
@ -32,22 +36,31 @@ import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.lang.String.format;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractScriptInvokeService implements ScriptInvokeService {
|
||||
|
||||
protected final Map<UUID, BlockedScriptInfo> disabledScripts = new ConcurrentHashMap<>();
|
||||
private final AtomicInteger pushedMsgs = new AtomicInteger(0);
|
||||
private final AtomicInteger invokeMsgs = new AtomicInteger(0);
|
||||
private final AtomicInteger evalMsgs = new AtomicInteger(0);
|
||||
protected final AtomicInteger failedMsgs = new AtomicInteger(0);
|
||||
protected final AtomicInteger timeoutMsgs = new AtomicInteger(0);
|
||||
private static final String REQUESTS = "requests";
|
||||
private static final String INVOKE_RESPONSES = "invoke_responses";
|
||||
private static final String EVAL_RESPONSES = "eval_responses";
|
||||
private static final String FAILURES = "failures";
|
||||
private static final String TIMEOUTS = "timeouts";
|
||||
|
||||
private final FutureCallback<UUID> evalCallback = new ScriptStatCallback<>(evalMsgs, timeoutMsgs, failedMsgs);
|
||||
private final FutureCallback<Object> invokeCallback = new ScriptStatCallback<>(invokeMsgs, timeoutMsgs, failedMsgs);
|
||||
protected final Map<UUID, BlockedScriptInfo> disabledScripts = new ConcurrentHashMap<>();
|
||||
|
||||
private StatsCounter requestsCounter;
|
||||
private StatsCounter invokeResponsesCounter;
|
||||
private StatsCounter evalResponsesCounter;
|
||||
private StatsCounter failuresCounter;
|
||||
private StatsCounter timeoutsCounter;
|
||||
|
||||
private FutureCallback<UUID> evalCallback;
|
||||
private FutureCallback<Object> invokeCallback;
|
||||
|
||||
@Autowired
|
||||
private StatsFactory statsFactory;
|
||||
|
||||
protected ScheduledExecutorService timeoutExecutorService;
|
||||
|
||||
@ -76,6 +89,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
protected abstract boolean isScriptPresent(UUID scriptId);
|
||||
|
||||
protected abstract boolean isExecEnabled(TenantId tenantId);
|
||||
|
||||
protected abstract void reportExecution(TenantId tenantId, CustomerId customerId);
|
||||
|
||||
protected abstract ListenableFuture<UUID> doEvalScript(TenantId tenantId, ScriptType scriptType, String scriptBody, UUID scriptId, String[] argNames);
|
||||
@ -85,6 +99,14 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
protected abstract void doRelease(UUID scriptId) throws Exception;
|
||||
|
||||
public void init() {
|
||||
String key = getStatsType().getName();
|
||||
this.requestsCounter = statsFactory.createStatsCounter(key, REQUESTS);
|
||||
this.invokeResponsesCounter = statsFactory.createStatsCounter(key, INVOKE_RESPONSES);
|
||||
this.evalResponsesCounter = statsFactory.createStatsCounter(key, EVAL_RESPONSES);
|
||||
this.failuresCounter = statsFactory.createStatsCounter(key, FAILURES);
|
||||
this.timeoutsCounter = statsFactory.createStatsCounter(key, TIMEOUTS);
|
||||
this.evalCallback = new ScriptStatCallback<>(evalResponsesCounter, timeoutsCounter, failuresCounter);
|
||||
this.invokeCallback = new ScriptStatCallback<>(invokeResponsesCounter, timeoutsCounter, failuresCounter);
|
||||
if (getMaxEvalRequestsTimeout() > 0 || getMaxInvokeRequestsTimeout() > 0) {
|
||||
timeoutExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("script-timeout");
|
||||
}
|
||||
@ -98,11 +120,11 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
|
||||
public void printStats() {
|
||||
if (isStatsEnabled()) {
|
||||
int pushed = pushedMsgs.getAndSet(0);
|
||||
int invoked = invokeMsgs.getAndSet(0);
|
||||
int evaluated = evalMsgs.getAndSet(0);
|
||||
int failed = failedMsgs.getAndSet(0);
|
||||
int timedOut = timeoutMsgs.getAndSet(0);
|
||||
int pushed = requestsCounter.getAndClear();
|
||||
int invoked = invokeResponsesCounter.getAndClear();
|
||||
int evaluated = evalResponsesCounter.getAndClear();
|
||||
int failed = failuresCounter.getAndClear();
|
||||
int timedOut = timeoutsCounter.getAndClear();
|
||||
if (pushed > 0 || invoked > 0 || evaluated > 0 || failed > 0 || timedOut > 0) {
|
||||
log.info("{}: pushed [{}] received [{}] invoke [{}] eval [{}] failed [{}] timedOut [{}]",
|
||||
getStatsName(), pushed, invoked + evaluated, invoked, evaluated, failed, timedOut);
|
||||
@ -117,7 +139,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
return error(format("Script body exceeds maximum allowed size of %s symbols", getMaxScriptBodySize()));
|
||||
}
|
||||
UUID scriptId = UUID.randomUUID();
|
||||
pushedMsgs.incrementAndGet();
|
||||
requestsCounter.increment();
|
||||
return withTimeoutAndStatsCallback(scriptId, null,
|
||||
doEvalScript(tenantId, scriptType, scriptBody, scriptId, argNames), evalCallback, getMaxEvalRequestsTimeout());
|
||||
} else {
|
||||
@ -139,7 +161,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
return Futures.immediateFailedFuture(handleScriptException(scriptId, null, t));
|
||||
}
|
||||
reportExecution(tenantId, customerId);
|
||||
pushedMsgs.incrementAndGet();
|
||||
requestsCounter.increment();
|
||||
log.trace("[{}] InvokeScript uuid {} with timeout {}ms", tenantId, scriptId, getMaxInvokeRequestsTimeout());
|
||||
var task = doInvokeFunction(scriptId, args);
|
||||
|
||||
@ -274,4 +296,6 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService
|
||||
private <T> ListenableFuture<T> error(String message) {
|
||||
return Futures.immediateFailedFuture(new RuntimeException(message));
|
||||
}
|
||||
|
||||
protected abstract StatsType getStatsType();
|
||||
}
|
||||
|
||||
@ -19,29 +19,29 @@ import com.google.common.util.concurrent.FutureCallback;
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.stats.StatsCounter;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class ScriptStatCallback<T> implements FutureCallback<T> {
|
||||
|
||||
private final AtomicInteger successMsgs;
|
||||
private final AtomicInteger timeoutMsgs;
|
||||
private final AtomicInteger failedMsgs;
|
||||
private final StatsCounter successMsgs;
|
||||
private final StatsCounter timeoutMsgs;
|
||||
private final StatsCounter failedMsgs;
|
||||
|
||||
@Override
|
||||
public void onSuccess(@Nullable T result) {
|
||||
successMsgs.incrementAndGet();
|
||||
successMsgs.increment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (t instanceof TimeoutException || (t.getCause() != null && t.getCause() instanceof TimeoutException)) {
|
||||
timeoutMsgs.incrementAndGet();
|
||||
timeoutMsgs.increment();
|
||||
} else {
|
||||
failedMsgs.incrementAndGet();
|
||||
failedMsgs.increment();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import org.thingsboard.script.api.ScriptType;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||
|
||||
@ -117,4 +118,8 @@ public abstract class AbstractJsInvokeService extends AbstractScriptInvokeServic
|
||||
.hash().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StatsType getStatsType() {
|
||||
return StatsType.JS_INVOKE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,6 +44,7 @@ import org.thingsboard.script.api.TbScriptException;
|
||||
import org.thingsboard.server.common.data.ApiUsageRecordKey;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
|
||||
import org.thingsboard.server.common.stats.TbApiUsageStateClient;
|
||||
|
||||
@ -255,4 +256,9 @@ public class DefaultTbelInvokeService extends AbstractScriptInvokeService implem
|
||||
protected long getMaxEvalRequestsTimeout() {
|
||||
return maxInvokeRequestsTimeout * 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StatsType getStatsType() {
|
||||
return StatsType.TBEL_INVOKE;
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,6 +41,10 @@ public class DefaultCounter {
|
||||
return aiCounter.get();
|
||||
}
|
||||
|
||||
public int getAndClear() {
|
||||
return aiCounter.getAndSet(0);
|
||||
}
|
||||
|
||||
public void add(int delta){
|
||||
aiCounter.addAndGet(delta);
|
||||
micrometerCounter.increment(delta);
|
||||
|
||||
@ -20,6 +20,7 @@ public enum StatsType {
|
||||
CORE("core"),
|
||||
TRANSPORT("transport"),
|
||||
JS_INVOKE("jsInvoke"),
|
||||
TBEL_INVOKE("tbelInvoke"),
|
||||
RATE_EXECUTOR("rateExecutor"),
|
||||
HOUSEKEEPER("housekeeper"),
|
||||
EDGE("edge");
|
||||
|
||||
@ -223,8 +223,8 @@
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 12,
|
||||
"w": 24,
|
||||
"h": 10,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
@ -303,6 +303,100 @@
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
},
|
||||
{
|
||||
"aliasColors": {},
|
||||
"bars": false,
|
||||
"dashLength": 10,
|
||||
"dashes": false,
|
||||
"datasource": null,
|
||||
"fieldConfig": {
|
||||
"defaults": {},
|
||||
"overrides": []
|
||||
},
|
||||
"fill": 1,
|
||||
"fillGradient": 0,
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"hiddenSeries": false,
|
||||
"id": 19,
|
||||
"legend": {
|
||||
"avg": false,
|
||||
"current": false,
|
||||
"max": false,
|
||||
"min": false,
|
||||
"show": true,
|
||||
"total": false,
|
||||
"values": false
|
||||
},
|
||||
"lines": true,
|
||||
"linewidth": 1,
|
||||
"nullPointMode": "null",
|
||||
"options": {
|
||||
"alertThreshold": true
|
||||
},
|
||||
"percentage": false,
|
||||
"pluginVersion": "7.5.4",
|
||||
"pointradius": 2,
|
||||
"points": false,
|
||||
"renderer": "flot",
|
||||
"seriesOverrides": [],
|
||||
"spaceLength": 10,
|
||||
"stack": false,
|
||||
"steppedLine": false,
|
||||
"targets": [
|
||||
{
|
||||
"exemplar": true,
|
||||
"expr": "sum by(statsName) (increase(tbelInvoke_total[1m]))",
|
||||
"interval": "",
|
||||
"legendFormat": "{{statsName}}",
|
||||
"refId": "A"
|
||||
}
|
||||
],
|
||||
"thresholds": [],
|
||||
"timeFrom": null,
|
||||
"timeRegions": [],
|
||||
"timeShift": null,
|
||||
"title": "TbelInvoke Stats",
|
||||
"tooltip": {
|
||||
"shared": true,
|
||||
"sort": 0,
|
||||
"value_type": "individual"
|
||||
},
|
||||
"type": "graph",
|
||||
"xaxis": {
|
||||
"buckets": null,
|
||||
"mode": "time",
|
||||
"name": null,
|
||||
"show": true,
|
||||
"values": []
|
||||
},
|
||||
"yaxes": [
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
},
|
||||
{
|
||||
"format": "short",
|
||||
"label": null,
|
||||
"logBase": 1,
|
||||
"max": null,
|
||||
"min": null,
|
||||
"show": true
|
||||
}
|
||||
],
|
||||
"yaxis": {
|
||||
"align": false,
|
||||
"alignLevel": null
|
||||
}
|
||||
}
|
||||
],
|
||||
"schemaVersion": 27,
|
||||
|
||||
@ -373,12 +373,6 @@ public interface TbContext {
|
||||
|
||||
ScriptEngine createScriptEngine(ScriptLanguage scriptLang, String script, String... argNames);
|
||||
|
||||
void logJsEvalRequest();
|
||||
|
||||
void logJsEvalResponse();
|
||||
|
||||
void logJsEvalFailure();
|
||||
|
||||
String getServiceId();
|
||||
|
||||
EventLoopGroup getSharedEventLoop();
|
||||
|
||||
@ -70,10 +70,8 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
|
||||
}
|
||||
|
||||
private ListenableFuture<TbAlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
|
||||
ctx.logJsEvalRequest();
|
||||
ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(msg, alarm.getDetails());
|
||||
return Futures.transform(asyncDetails, details -> {
|
||||
ctx.logJsEvalResponse();
|
||||
AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), System.currentTimeMillis(), details);
|
||||
if (result.isSuccessful()) {
|
||||
return new TbAlarmResult(false, false, result.isCleared(), result.getAlarm());
|
||||
|
||||
@ -119,15 +119,11 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
|
||||
ListenableFuture<JsonNode> asyncDetails;
|
||||
boolean buildDetails = !config.isUseMessageAlarmData() || config.isOverwriteAlarmDetails();
|
||||
if (buildDetails) {
|
||||
ctx.logJsEvalRequest();
|
||||
asyncDetails = buildAlarmDetails(msg, null);
|
||||
} else {
|
||||
asyncDetails = Futures.immediateFuture(null);
|
||||
}
|
||||
ListenableFuture<Alarm> asyncAlarm = Futures.transform(asyncDetails, details -> {
|
||||
if (buildDetails) {
|
||||
ctx.logJsEvalResponse();
|
||||
}
|
||||
Alarm newAlarm;
|
||||
if (msgAlarm != null) {
|
||||
newAlarm = msgAlarm;
|
||||
@ -148,15 +144,11 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
|
||||
ListenableFuture<JsonNode> asyncDetails;
|
||||
boolean buildDetails = !config.isUseMessageAlarmData() || config.isOverwriteAlarmDetails();
|
||||
if (buildDetails) {
|
||||
ctx.logJsEvalRequest();
|
||||
asyncDetails = buildAlarmDetails(msg, existingAlarm.getDetails());
|
||||
} else {
|
||||
asyncDetails = Futures.immediateFuture(null);
|
||||
}
|
||||
ListenableFuture<AlarmApiCallResult> asyncUpdated = Futures.transform(asyncDetails, details -> {
|
||||
if (buildDetails) {
|
||||
ctx.logJsEvalResponse();
|
||||
}
|
||||
if (msgAlarm != null) {
|
||||
existingAlarm.setSeverity(msgAlarm.getSeverity());
|
||||
existingAlarm.setPropagate(msgAlarm.isPropagate());
|
||||
|
||||
@ -76,18 +76,15 @@ public class TbLogNode implements TbNode {
|
||||
return;
|
||||
}
|
||||
|
||||
ctx.logJsEvalRequest();
|
||||
Futures.addCallback(scriptEngine.executeToStringAsync(msg), new FutureCallback<String>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable String result) {
|
||||
ctx.logJsEvalResponse();
|
||||
log.info(result);
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
ctx.logJsEvalResponse();
|
||||
ctx.tellFailure(msg, t);
|
||||
}
|
||||
}, MoreExecutors.directExecutor()); //usually js responses runs on js callback executor
|
||||
|
||||
@ -161,10 +161,8 @@ public class TbMsgGeneratorNode implements TbNode {
|
||||
prevMsg = ctx.newMsg(queueName, TbMsg.EMPTY_STRING, originatorId, msg.getCustomerId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
|
||||
}
|
||||
if (initialized.get()) {
|
||||
ctx.logJsEvalRequest();
|
||||
return Futures.transformAsync(scriptEngine.executeGenerateAsync(prevMsg), generated -> {
|
||||
log.trace("generate process response, generated {}, config {}", generated, config);
|
||||
ctx.logJsEvalResponse();
|
||||
prevMsg = ctx.newMsg(queueName, generated.getType(), originatorId, msg.getCustomerId(), generated.getMetaData(), generated.getData());
|
||||
return Futures.immediateFuture(prevMsg);
|
||||
}, MoreExecutors.directExecutor()); //usually it runs on js-executor-remote-callback thread pool
|
||||
|
||||
@ -61,15 +61,12 @@ public class TbJsFilterNode implements TbNode {
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
ctx.logJsEvalRequest();
|
||||
withCallback(scriptEngine.executeFilterAsync(msg),
|
||||
filterResult -> {
|
||||
ctx.logJsEvalResponse();
|
||||
ctx.tellNext(msg, filterResult ? TbNodeConnectionType.TRUE : TbNodeConnectionType.FALSE);
|
||||
},
|
||||
t -> {
|
||||
ctx.tellFailure(msg, t);
|
||||
ctx.logJsEvalFailure();
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
|
||||
@ -62,17 +62,14 @@ public class TbJsSwitchNode implements TbNode {
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
ctx.logJsEvalRequest();
|
||||
Futures.addCallback(scriptEngine.executeSwitchAsync(msg), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable Set<String> result) {
|
||||
ctx.logJsEvalResponse();
|
||||
processSwitch(ctx, msg, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
ctx.logJsEvalFailure();
|
||||
ctx.tellFailure(msg, t);
|
||||
}
|
||||
}, MoreExecutors.directExecutor()); //usually runs in a callbackExecutor
|
||||
|
||||
@ -58,13 +58,11 @@ public class TbTransformMsgNode extends TbAbstractTransformNode<TbTransformMsgNo
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<List<TbMsg>> transform(TbContext ctx, TbMsg msg) {
|
||||
ctx.logJsEvalRequest();
|
||||
return scriptEngine.executeUpdateAsync(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void transformFailure(TbContext ctx, TbMsg msg, Throwable t) {
|
||||
ctx.logJsEvalFailure();
|
||||
super.transformFailure(ctx, msg, t);
|
||||
}
|
||||
|
||||
|
||||
@ -119,8 +119,8 @@ class TbCreateAlarmNodeTest {
|
||||
delete metadata.prevAlarmDetails;
|
||||
//now metadata is the same as it comes IN this rule node
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
return details;""");
|
||||
assertThat(config.getAlarmDetailsBuildTbel()).isEqualTo("""
|
||||
\
|
||||
@ -131,8 +131,8 @@ class TbCreateAlarmNodeTest {
|
||||
metadata.remove('prevAlarmDetails');
|
||||
//now metadata is the same as it comes IN this rule node
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
return details;""");
|
||||
assertThat(config.getSeverity()).isEqualTo(AlarmSeverity.CRITICAL.name());
|
||||
assertThat(config.isPropagate()).isFalse();
|
||||
@ -242,9 +242,7 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script evaluation
|
||||
then(ctxMock).should().logJsEvalRequest();
|
||||
then(alarmDetailsScriptMock).should().executeJsonAsync(incomingMsg);
|
||||
then(ctxMock).should().logJsEvalResponse();
|
||||
|
||||
// verify we called createAlarm() with correct AlarmCreateOrUpdateActiveRequest
|
||||
then(alarmServiceMock).should().createAlarm(expectedCreateAlarmRequest);
|
||||
@ -411,9 +409,7 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script evaluation
|
||||
then(ctxMock).should().logJsEvalRequest();
|
||||
then(alarmDetailsScriptMock).should().executeJsonAsync(incomingMsg);
|
||||
then(ctxMock).should().logJsEvalResponse();
|
||||
|
||||
// verify we called createAlarm() with correct AlarmCreateOrUpdateActiveRequest
|
||||
then(alarmServiceMock).should().createAlarm(expectedCreateAlarmRequest);
|
||||
@ -601,14 +597,12 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script evaluation
|
||||
then(ctxMock).should().logJsEvalRequest();
|
||||
var dummyMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
then(alarmDetailsScriptMock).should().executeJsonAsync(dummyMsgCaptor.capture());
|
||||
TbMsg actualDummyMsg = dummyMsgCaptor.getValue();
|
||||
assertThat(actualDummyMsg.getType()).isEqualTo(incomingMsg.getType());
|
||||
assertThat(actualDummyMsg.getData()).isEqualTo(incomingMsg.getData());
|
||||
assertThat(actualDummyMsg.getMetaData().getData()).containsEntry("prevAlarmDetails", JacksonUtil.toString(oldAlarmDetails));
|
||||
then(ctxMock).should().logJsEvalResponse();
|
||||
|
||||
// verify we called updateAlarm() with correct AlarmUpdateRequest
|
||||
then(alarmServiceMock).should().updateAlarm(expectedUpdateAlarmRequest);
|
||||
@ -773,9 +767,7 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script was not evaluated
|
||||
then(ctxMock).should(never()).logJsEvalRequest();
|
||||
then(alarmDetailsScriptMock).should(never()).executeJsonAsync(any());
|
||||
then(ctxMock).should(never()).logJsEvalResponse();
|
||||
|
||||
// verify we called createAlarm() with correct AlarmCreateOrUpdateActiveRequest
|
||||
then(alarmServiceMock).should().createAlarm(expectedCreateAlarmRequest);
|
||||
@ -960,14 +952,12 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script evaluation
|
||||
then(ctxMock).should().logJsEvalRequest();
|
||||
var dummyMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
then(alarmDetailsScriptMock).should().executeJsonAsync(dummyMsgCaptor.capture());
|
||||
TbMsg actualDummyMsg = dummyMsgCaptor.getValue();
|
||||
assertThat(actualDummyMsg.getType()).isEqualTo(incomingMsg.getType());
|
||||
assertThat(actualDummyMsg.getData()).isEqualTo(incomingMsg.getData());
|
||||
assertThat(actualDummyMsg.getMetaData().getData()).containsEntry("prevAlarmDetails", JacksonUtil.toString(oldAlarmDetails));
|
||||
then(ctxMock).should().logJsEvalResponse();
|
||||
|
||||
// verify we called updateAlarm() with correct AlarmUpdateRequest
|
||||
then(alarmServiceMock).should().updateAlarm(expectedUpdateAlarmRequest);
|
||||
@ -1141,14 +1131,12 @@ class TbCreateAlarmNodeTest {
|
||||
// THEN
|
||||
|
||||
// verify alarm details script evaluation
|
||||
then(ctxMock).should().logJsEvalRequest();
|
||||
var dummyMsgCaptor = ArgumentCaptor.forClass(TbMsg.class);
|
||||
then(alarmDetailsScriptMock).should().executeJsonAsync(dummyMsgCaptor.capture());
|
||||
TbMsg actualDummyMsg = dummyMsgCaptor.getValue();
|
||||
assertThat(actualDummyMsg.getType()).isEqualTo(incomingMsg.getType());
|
||||
assertThat(actualDummyMsg.getData()).isEqualTo(incomingMsg.getData());
|
||||
assertThat(actualDummyMsg.getMetaData().getData()).containsEntry("prevAlarmDetails", JacksonUtil.toString(alarmDetails));
|
||||
then(ctxMock).should().logJsEvalResponse();
|
||||
|
||||
// verify we called updateAlarm() with correct AlarmUpdateRequest
|
||||
then(alarmServiceMock).should().updateAlarm(expectedUpdateAlarmRequest);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user