diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 64e6de0f1b..7b70554ddb 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -107,6 +107,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th)); } + msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null)); } @@ -216,6 +217,7 @@ class DefaultTbContext implements TbContext { if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null); } + tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId()); tbMsg.getCallback().onSuccess(); } @@ -252,26 +254,26 @@ class DefaultTbContext implements TbContext { } public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) { - return entityCreatedMsg(customer, customer.getId(), ruleNodeId); + return entityActionMsg(customer, customer.getId(), ruleNodeId, DataConstants.ENTITY_CREATED); } public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) { - return entityCreatedMsg(device, device.getId(), ruleNodeId); + return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED); } public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) { - return entityCreatedMsg(asset, asset.getId(), ruleNodeId); + return entityActionMsg(asset, asset.getId(), ruleNodeId, DataConstants.ENTITY_CREATED); } - public TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId) { - return entityCreatedMsg(alarm, alarm.getId(), ruleNodeId); + public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) { + return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action); } - public TbMsg entityCreatedMsg(E entity, I id, RuleNodeId ruleNodeId) { + public TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) { try { - return TbMsg.newMsg(DataConstants.ENTITY_CREATED, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity))); + return TbMsg.newMsg(action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity))); } catch (JsonProcessingException | IllegalArgumentException e) { - throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " created msg: " + e); + throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 7583ea553e..db55ff8edf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -103,7 +103,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor submitExecutor.submit(() -> { log.trace("[{}] Creating callback for message: {}", id, msg.getValue()); ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); @@ -194,6 +194,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (!ctx.getFailedMap().isEmpty()) { printFirstOrAll(configuration, ctx, ctx.getFailedMap(), "Failed"); } + ctx.printProfilerStats(); + TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); if (statsEnabled) { stats.log(result, decision.isCommit()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java index c7925fd759..5712f29eac 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackCallback.java @@ -49,8 +49,14 @@ public class TbMsgPackCallback implements TbMsgCallback { } @Override - public void visit(RuleNodeInfo ruleNodeInfo) { - log.trace("[{}] ON PROCESS: {}", id, ruleNodeInfo); - ctx.visit(id, ruleNodeInfo); + public void onProcessingStart(RuleNodeInfo ruleNodeInfo) { + log.trace("[{}] ON PROCESSING START: {}", id, ruleNodeInfo); + ctx.onProcessingStart(id, ruleNodeInfo); + } + + @Override + public void onProcessingEnd(RuleNodeId ruleNodeId) { + log.trace("[{}] ON PROCESSING END: {}", id, ruleNodeId); + ctx.onProcessingEnd(id, ruleNodeId); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java index 1b7ffd9c30..6d88ed204a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContext.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.queue; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -24,6 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; +import java.util.Comparator; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,9 +34,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +@Slf4j public class TbMsgPackProcessingContext { + private final String queueName; private final TbRuleEngineSubmitStrategy submitStrategy; + @Getter + private final boolean profilerEnabled; private final AtomicInteger pendingCount; private final CountDownLatch processingTimeoutLatch = new CountDownLatch(1); @Getter @@ -47,14 +54,20 @@ public class TbMsgPackProcessingContext { private final ConcurrentMap lastRuleNodeMap = new ConcurrentHashMap<>(); - public TbMsgPackProcessingContext(TbRuleEngineSubmitStrategy submitStrategy) { + public TbMsgPackProcessingContext(String queueName, TbRuleEngineSubmitStrategy submitStrategy) { + this.queueName = queueName; this.submitStrategy = submitStrategy; + this.profilerEnabled = log.isDebugEnabled(); this.pendingMap = submitStrategy.getPendingMap(); this.pendingCount = new AtomicInteger(pendingMap.size()); } public boolean await(long packProcessingTimeout, TimeUnit milliseconds) throws InterruptedException { - return processingTimeoutLatch.await(packProcessingTimeout, milliseconds); + boolean success = processingTimeoutLatch.await(packProcessingTimeout, milliseconds); + if (!success && profilerEnabled) { + msgProfilerMap.values().forEach(this::onTimeout); + } + return success; } public void onSuccess(UUID id) { @@ -85,12 +98,53 @@ public class TbMsgPackProcessingContext { } } - public void visit(UUID id, RuleNodeInfo ruleNodeInfo) { + private final ConcurrentHashMap msgProfilerMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap ruleNodeProfilerMap = new ConcurrentHashMap<>(); + + public void onProcessingStart(UUID id, RuleNodeInfo ruleNodeInfo) { lastRuleNodeMap.put(id, ruleNodeInfo); + if (profilerEnabled) { + msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onStart(ruleNodeInfo.getRuleNodeId()); + ruleNodeProfilerMap.putIfAbsent(ruleNodeInfo.getRuleNodeId().getId(), new TbRuleNodeProfilerInfo(ruleNodeInfo)); + } + } + + public void onProcessingEnd(UUID id, RuleNodeId ruleNodeId) { + if (profilerEnabled) { + long processingTime = msgProfilerMap.computeIfAbsent(id, TbMsgProfilerInfo::new).onEnd(ruleNodeId); + if (processingTime > 0) { + ruleNodeProfilerMap.computeIfAbsent(ruleNodeId.getId(), TbRuleNodeProfilerInfo::new).record(processingTime); + } + } + } + + public void onTimeout(TbMsgProfilerInfo profilerInfo) { + Map.Entry ruleNodeInfo = profilerInfo.onTimeout(); + if (ruleNodeInfo != null) { + ruleNodeProfilerMap.computeIfAbsent(ruleNodeInfo.getKey(), TbRuleNodeProfilerInfo::new).record(ruleNodeInfo.getValue()); + } } public RuleNodeInfo getLastVisitedRuleNode(UUID id) { return lastRuleNodeMap.get(id); } + public void printProfilerStats() { + if (profilerEnabled) { + log.debug("Top Rule Nodes by max execution time:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingLong(TbRuleNodeProfilerInfo::getMaxExecutionTime).reversed()).limit(5) + .forEach(info -> log.debug("[{}][{}] max execution time: {}. {}", queueName, info.getRuleNodeId(), info.getMaxExecutionTime(), info.getLabel())); + + log.info("Top Rule Nodes by avg execution time:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingDouble(TbRuleNodeProfilerInfo::getAvgExecutionTime).reversed()).limit(5) + .forEach(info -> log.info("[{}][{}] avg execution time: {}. {}", queueName, info.getRuleNodeId(), info.getAvgExecutionTime(), info.getLabel())); + + log.info("Top Rule Nodes by execution count:"); + ruleNodeProfilerMap.values().stream() + .sorted(Comparator.comparingInt(TbRuleNodeProfilerInfo::getExecutionCount).reversed()).limit(5) + .forEach(info -> log.info("[{}][{}] execution count: {}. {}", queueName, info.getRuleNodeId(), info.getExecutionCount(), info.getLabel())); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java new file mode 100644 index 0000000000..f66cd1a50a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgProfilerInfo.java @@ -0,0 +1,85 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +public class TbMsgProfilerInfo { + private final UUID msgId; + private AtomicLong totalProcessingTime = new AtomicLong(); + private Lock stateLock = new ReentrantLock(); + private RuleNodeId currentRuleNodeId; + private long stateChangeTime; + + public TbMsgProfilerInfo(UUID msgId) { + this.msgId = msgId; + } + + public void onStart(RuleNodeId ruleNodeId) { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + currentRuleNodeId = ruleNodeId; + stateChangeTime = currentTime; + } finally { + stateLock.unlock(); + } + } + + public long onEnd(RuleNodeId ruleNodeId) { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + if (ruleNodeId.equals(currentRuleNodeId)) { + long processingTime = currentTime - stateChangeTime; + stateChangeTime = currentTime; + totalProcessingTime.addAndGet(processingTime); + currentRuleNodeId = null; + return processingTime; + } else { + log.trace("[{}] Invalid sequence of rule node processing detected. Expected [{}] but was [{}]", msgId, currentRuleNodeId, ruleNodeId); + return 0; + } + } finally { + stateLock.unlock(); + } + } + + public Map.Entry onTimeout() { + long currentTime = System.currentTimeMillis(); + stateLock.lock(); + try { + if (currentRuleNodeId != null && stateChangeTime > 0) { + long timeoutTime = currentTime - stateChangeTime; + totalProcessingTime.addAndGet(timeoutTime); + return new AbstractMap.SimpleEntry<>(currentRuleNodeId.getId(), timeoutTime); + } + } finally { + stateLock.unlock(); + } + return null; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java new file mode 100644 index 0000000000..c88532fbc3 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleNodeProfilerInfo.java @@ -0,0 +1,75 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.Getter; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class TbRuleNodeProfilerInfo { + @Getter + private final UUID ruleNodeId; + @Getter + private final String label; + private AtomicInteger executionCount = new AtomicInteger(0); + private AtomicLong executionTime = new AtomicLong(0); + private AtomicLong maxExecutionTime = new AtomicLong(0); + + public TbRuleNodeProfilerInfo(RuleNodeInfo ruleNodeInfo) { + this.ruleNodeId = ruleNodeInfo.getRuleNodeId().getId(); + this.label = ruleNodeInfo.toString(); + } + + public TbRuleNodeProfilerInfo(UUID ruleNodeId) { + this.ruleNodeId = ruleNodeId; + this.label = ""; + } + + public void record(long processingTime) { + executionCount.incrementAndGet(); + executionTime.addAndGet(processingTime); + while (true) { + long value = maxExecutionTime.get(); + if (value >= processingTime) { + break; + } + if (maxExecutionTime.compareAndSet(value, processingTime)) { + break; + } + } + } + + int getExecutionCount() { + return executionCount.get(); + } + + long getMaxExecutionTime() { + return maxExecutionTime.get(); + } + + double getAvgExecutionTime() { + double executionCnt = (double) executionCount.get(); + if (executionCnt > 0) { + return executionTime.get() / executionCnt; + } else { + return 0.0; + } + } + +} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java index b9741d2433..be299b39b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/BatchTbRuleEngineSubmitStrategy.java @@ -68,18 +68,20 @@ public class BatchTbRuleEngineSubmitStrategy extends AbstractTbRuleEngineSubmitS int listSize = orderedMsgList.size(); int startIdx = Math.min(packIdx.get() * batchSize, listSize); int endIdx = Math.min(startIdx + batchSize, listSize); + Map> tmpPack; synchronized (pendingPack) { pendingPack.clear(); for (int i = startIdx; i < endIdx; i++) { IdMsgPair pair = orderedMsgList.get(i); pendingPack.put(pair.uuid, pair.msg); } + tmpPack = new LinkedHashMap<>(pendingPack); } int submitSize = pendingPack.size(); if (log.isDebugEnabled() && submitSize > 0) { log.debug("[{}] submitting [{}] messages to rule engine", queueName, submitSize); } - pendingPack.forEach(msgConsumer); + tmpPack.forEach(msgConsumer); } } diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationFailureHandler.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationFailureHandler.java index 653be85f72..984936874f 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationFailureHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationFailureHandler.java @@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets; @Component(value = "oauth2AuthenticationFailureHandler") @ConditionalOnProperty(prefix = "security.oauth2", value = "enabled", havingValue = "true") -public class Oauth2AuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler { +public class Oauth2AuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler { @Override public void onAuthenticationFailure(HttpServletRequest request, diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationSuccessHandler.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationSuccessHandler.java index 3375d046e2..6641ab0ac4 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationSuccessHandler.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/Oauth2AuthenticationSuccessHandler.java @@ -59,7 +59,6 @@ public class Oauth2AuthenticationSuccessHandler extends SimpleUrlAuthenticationS public void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException { - String baseUrl = MiscUtils.constructBaseUrl(request); try { OAuth2AuthenticationToken token = (OAuth2AuthenticationToken) authentication; diff --git a/application/src/main/java/org/thingsboard/server/service/security/system/DefaultSystemSecurityService.java b/application/src/main/java/org/thingsboard/server/service/security/system/DefaultSystemSecurityService.java index 11857f25b6..4219dbc609 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/system/DefaultSystemSecurityService.java +++ b/application/src/main/java/org/thingsboard/server/service/security/system/DefaultSystemSecurityService.java @@ -40,17 +40,20 @@ import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.security.UserCredentials; +import org.thingsboard.server.common.data.security.model.SecuritySettings; +import org.thingsboard.server.common.data.security.model.UserPasswordPolicy; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.settings.AdminSettingsService; import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.user.UserServiceImpl; import org.thingsboard.server.service.security.exception.UserPasswordExpiredException; -import org.thingsboard.server.common.data.security.model.SecuritySettings; -import org.thingsboard.server.common.data.security.model.UserPasswordPolicy; +import org.thingsboard.server.utils.MiscUtils; import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -146,7 +149,7 @@ public class DefaultSystemSecurityService implements SystemSecurityService { if (isPositiveInteger(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays())) { if ((userCredentials.getCreatedTime() + TimeUnit.DAYS.toMillis(securitySettings.getPasswordPolicy().getPasswordExpirationPeriodDays())) - < System.currentTimeMillis()) { + < System.currentTimeMillis()) { userCredentials = userService.requestExpiredPasswordReset(tenantId, userCredentials.getId()); throw new UserPasswordExpiredException("User password expired!", userCredentials.getResetToken()); } @@ -197,6 +200,21 @@ public class DefaultSystemSecurityService implements SystemSecurityService { } } + @Override + public String getBaseUrl(TenantId tenantId, CustomerId customerId, HttpServletRequest httpServletRequest) { + String baseUrl; + AdminSettings generalSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "general"); + + JsonNode prohibitDifferentUrl = generalSettings.getJsonValue().get("prohibitDifferentUrl"); + + if (prohibitDifferentUrl != null && prohibitDifferentUrl.asBoolean()) { + baseUrl = generalSettings.getJsonValue().get("baseUrl").asText(); + } else { + baseUrl = MiscUtils.constructBaseUrl(httpServletRequest); + } + return baseUrl; + } + private static boolean isPositiveInteger(Integer val) { return val != null && val.intValue() > 0; } diff --git a/application/src/main/java/org/thingsboard/server/service/security/system/SystemSecurityService.java b/application/src/main/java/org/thingsboard/server/service/security/system/SystemSecurityService.java index afb7834cef..01367d53f1 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/system/SystemSecurityService.java +++ b/application/src/main/java/org/thingsboard/server/service/security/system/SystemSecurityService.java @@ -16,11 +16,14 @@ package org.thingsboard.server.service.security.system; import org.springframework.security.core.AuthenticationException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.common.data.security.model.SecuritySettings; +import javax.servlet.http.HttpServletRequest; + public interface SystemSecurityService { SecuritySettings getSecuritySettings(TenantId tenantId); @@ -31,4 +34,6 @@ public interface SystemSecurityService { void validatePassword(TenantId tenantId, String password, UserCredentials userCredentials) throws DataValidationException; + String getBaseUrl(TenantId tenantId, CustomerId customerId, HttpServletRequest httpServletRequest); + } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index d44dd867de..08f87ac667 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -175,7 +175,6 @@ public abstract class AbstractControllerTest { .apply(springSecurity()).build(); } loginSysAdmin(); - Tenant tenant = new Tenant(); tenant.setTitle(TEST_TENANT_NAME); Tenant savedTenant = doPost("/api/tenant", tenant, Tenant.class); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java index 43cd62ea97..595f64b3a4 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/TbMsgPackProcessingContextTest.java @@ -51,7 +51,7 @@ public class TbMsgPackProcessingContextTest { messages.put(UUID.randomUUID(), new TbProtoQueueMsg<>(UUID.randomUUID(), null)); } when(strategyMock.getPendingMap()).thenReturn(messages); - TbMsgPackProcessingContext context = new TbMsgPackProcessingContext(strategyMock); + TbMsgPackProcessingContext context = new TbMsgPackProcessingContext("Main", strategyMock); for (UUID uuid : messages.keySet()) { for (int i = 0; i < parallelCount; i++) { executorService.submit(() -> context.onSuccess(uuid)); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java index 0c2d38b3b2..9af3f20363 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/RuleNodeInfo.java @@ -15,12 +15,16 @@ */ package org.thingsboard.server.common.msg.queue; +import lombok.Getter; import org.thingsboard.server.common.data.id.RuleNodeId; public class RuleNodeInfo { private final String label; + @Getter + private final RuleNodeId ruleNodeId; public RuleNodeInfo(RuleNodeId id, String ruleChainName, String ruleNodeName) { + this.ruleNodeId = id; this.label = "[RuleChain: " + ruleChainName + "|RuleNode: " + ruleNodeName + "(" + id + ")]"; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java index 4103a4efc1..3f6927adb1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/queue/TbMsgCallback.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.msg.queue; +import org.thingsboard.server.common.data.id.RuleNodeId; + public interface TbMsgCallback { TbMsgCallback EMPTY = new TbMsgCallback() { @@ -34,7 +36,11 @@ public interface TbMsgCallback { void onFailure(RuleEngineException e); - default void visit(RuleNodeInfo ruleNodeInfo) { + default void onProcessingStart(RuleNodeInfo ruleNodeInfo) { } + default void onProcessingEnd(RuleNodeId ruleNodeId) { + } + + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java index 3e508b09e6..a4e977b6f0 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsProducerTemplate.java @@ -87,8 +87,9 @@ public class TbAwsSqsProducerTemplate implements TbQueuePr sendMsgRequest.withQueueUrl(getQueueUrl(tpi.getFullTopicName())); sendMsgRequest.withMessageBody(gson.toJson(new DefaultTbQueueMsg(msg))); - sendMsgRequest.withMessageGroupId(tpi.getTopic()); - sendMsgRequest.withMessageDeduplicationId(UUID.randomUUID().toString()); + String sqsMsgId = UUID.randomUUID().toString(); + sendMsgRequest.withMessageGroupId(sqsMsgId); + sendMsgRequest.withMessageDeduplicationId(sqsMsgId); ListenableFuture future = producerExecutor.submit(() -> sqsClient.sendMessage(sendMsgRequest)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 291b5a032d..8a0bc233cf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -54,8 +55,10 @@ import org.thingsboard.server.dao.tenant.TenantDao; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -135,6 +138,10 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return null; } + if (CollectionUtils.isNotEmpty(ruleChainMetaData.getConnections())) { + validateCircles(ruleChainMetaData.getConnections()); + } + List nodes = ruleChainMetaData.getNodes(); List toAddOrUpdate = new ArrayList<>(); List toDelete = new ArrayList<>(); @@ -217,6 +224,31 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC return loadRuleChainMetaData(tenantId, ruleChainMetaData.getRuleChainId()); } + private void validateCircles(List connectionInfos) { + Map> connectionsMap = new HashMap<>(); + for (NodeConnectionInfo nodeConnection : connectionInfos) { + if (nodeConnection.getFromIndex() == nodeConnection.getToIndex()) { + throw new DataValidationException("Can't create the relation to yourself."); + } + connectionsMap + .computeIfAbsent(nodeConnection.getFromIndex(), from -> new HashSet<>()) + .add(nodeConnection.getToIndex()); + } + connectionsMap.keySet().forEach(key -> validateCircles(key, connectionsMap.get(key), connectionsMap)); + } + + private void validateCircles(int from, Set toList, Map> connectionsMap) { + if (toList == null) { + return; + } + for (Integer to : toList) { + if (from == to) { + throw new DataValidationException("Can't create circling relations in rule chain."); + } + validateCircles(from, connectionsMap.get(to), connectionsMap); + } + } + @Override public RuleChainMetaData loadRuleChainMetaData(TenantId tenantId, RuleChainId ruleChainId) { Validator.validateId(ruleChainId, "Incorrect rule chain id."); @@ -299,7 +331,6 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } } - @Override public List getRuleChainNodes(TenantId tenantId, RuleChainId ruleChainId) { Validator.validateId(ruleChainId, "Incorrect rule chain id for search request."); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java index f9ec01a00a..53dc55a0ec 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java @@ -319,6 +319,16 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest { ruleChainService.deleteRuleChainById(tenantId, savedRuleChainMetaData.getRuleChainId()); } + @Test(expected = DataValidationException.class) + public void testUpdateRuleChainMetaDataWithCirclingRelation() throws Exception { + ruleChainService.saveRuleChainMetaData(tenantId, createRuleChainMetadataWithCirclingRelation()); + } + + @Test(expected = DataValidationException.class) + public void testUpdateRuleChainMetaDataWithCirclingRelation2() throws Exception { + ruleChainService.saveRuleChainMetaData(tenantId, createRuleChainMetadataWithCirclingRelation2()); + } + @Test public void testGetDefaultEdgeRuleChains() throws Exception { RuleChainId ruleChainId = saveRuleChainAndSetDefaultEdge("Default Edge Rule Chain 1"); @@ -397,5 +407,85 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest { return ruleChainService.saveRuleChainMetaData(tenantId, ruleChainMetaData); } + private RuleChainMetaData createRuleChainMetadataWithCirclingRelation() throws Exception { + RuleChain ruleChain = new RuleChain(); + ruleChain.setName("My RuleChain"); + ruleChain.setTenantId(tenantId); + RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain); + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(savedRuleChain.getId()); + + ObjectMapper mapper = new ObjectMapper(); + + RuleNode ruleNode1 = new RuleNode(); + ruleNode1.setName("name1"); + ruleNode1.setType("type1"); + ruleNode1.setConfiguration(mapper.readTree("\"key1\": \"val1\"")); + + RuleNode ruleNode2 = new RuleNode(); + ruleNode2.setName("name2"); + ruleNode2.setType("type2"); + ruleNode2.setConfiguration(mapper.readTree("\"key2\": \"val2\"")); + + RuleNode ruleNode3 = new RuleNode(); + ruleNode3.setName("name3"); + ruleNode3.setType("type3"); + ruleNode3.setConfiguration(mapper.readTree("\"key3\": \"val3\"")); + + List ruleNodes = new ArrayList<>(); + ruleNodes.add(ruleNode1); + ruleNodes.add(ruleNode2); + ruleNodes.add(ruleNode3); + ruleChainMetaData.setFirstNodeIndex(0); + ruleChainMetaData.setNodes(ruleNodes); + + ruleChainMetaData.addConnectionInfo(0,1,"success"); + ruleChainMetaData.addConnectionInfo(0,2,"fail"); + ruleChainMetaData.addConnectionInfo(1,2,"success"); + ruleChainMetaData.addConnectionInfo(2,2,"success"); + + return ruleChainMetaData; + } + + private RuleChainMetaData createRuleChainMetadataWithCirclingRelation2() throws Exception { + RuleChain ruleChain = new RuleChain(); + ruleChain.setName("My RuleChain"); + ruleChain.setTenantId(tenantId); + RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain); + + RuleChainMetaData ruleChainMetaData = new RuleChainMetaData(); + ruleChainMetaData.setRuleChainId(savedRuleChain.getId()); + + ObjectMapper mapper = new ObjectMapper(); + + RuleNode ruleNode1 = new RuleNode(); + ruleNode1.setName("name1"); + ruleNode1.setType("type1"); + ruleNode1.setConfiguration(mapper.readTree("\"key1\": \"val1\"")); + + RuleNode ruleNode2 = new RuleNode(); + ruleNode2.setName("name2"); + ruleNode2.setType("type2"); + ruleNode2.setConfiguration(mapper.readTree("\"key2\": \"val2\"")); + + RuleNode ruleNode3 = new RuleNode(); + ruleNode3.setName("name3"); + ruleNode3.setType("type3"); + ruleNode3.setConfiguration(mapper.readTree("\"key3\": \"val3\"")); + + List ruleNodes = new ArrayList<>(); + ruleNodes.add(ruleNode1); + ruleNodes.add(ruleNode2); + ruleNodes.add(ruleNode3); + ruleChainMetaData.setFirstNodeIndex(0); + ruleChainMetaData.setNodes(ruleNodes); + + ruleChainMetaData.addConnectionInfo(0,1,"success"); + ruleChainMetaData.addConnectionInfo(0,2,"fail"); + ruleChainMetaData.addConnectionInfo(1,2,"success"); + ruleChainMetaData.addConnectionInfo(2,0,"success"); + + return ruleChainMetaData; + } } diff --git a/docker/queue-kafka.env b/docker/queue-kafka.env index 0207d64ef5..63107942fb 100644 --- a/docker/queue-kafka.env +++ b/docker/queue-kafka.env @@ -1,3 +1,2 @@ TB_QUEUE_TYPE=kafka TB_KAFKA_SERVERS=kafka:9092 -TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES=retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100 diff --git a/msa/js-executor/queue/awsSqsTemplate.js b/msa/js-executor/queue/awsSqsTemplate.js index 52d4997f33..24315e5cad 100644 --- a/msa/js-executor/queue/awsSqsTemplate.js +++ b/msa/js-executor/queue/awsSqsTemplate.js @@ -52,11 +52,13 @@ function AwsSqsProducer() { queueUrls.set(responseTopic, responseQueueUrl); } + let msgId = uuid(); + let params = { MessageBody: msgBody, QueueUrl: responseQueueUrl, - MessageGroupId: 'js_eval', - MessageDeduplicationId: uuid() + MessageGroupId: msgId, + MessageDeduplicationId: msgId }; return new Promise((resolve, reject) => { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 8de0068561..518fab95eb 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -143,7 +143,7 @@ public interface TbContext { TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId); // TODO: Does this changes the message? - TbMsg alarmCreatedMsg(Alarm alarm, RuleNodeId ruleNodeId); + TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action); /* * diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java index b3344105bb..6ed5a05589 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -61,13 +62,11 @@ public abstract class TbAbstractAlarmNode ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Created"), - throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable)); + tellNext(ctx, msg, alarmResult, DataConstants.ENTITY_CREATED, "Created"); } else if (alarmResult.isUpdated) { - ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Updated"); + tellNext(ctx, msg, alarmResult, DataConstants.ENTITY_UPDATED, "Updated"); } else if (alarmResult.isCleared) { - ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), "Cleared"); + tellNext(ctx, msg, alarmResult, DataConstants.ALARM_CLEAR, "Cleared"); } else { ctx.tellSuccess(msg); } @@ -126,4 +125,10 @@ public abstract class TbAbstractAlarmNode ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmResultMsgType), + throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable)); + } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java index 114967eb51..ba6a98afb1 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbAlarmNodeTest.java @@ -34,6 +34,7 @@ import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.DeviceId; @@ -249,6 +250,8 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture()); + successCaptor.getValue().run(); verify(ctx).tellNext(any(), eq("Updated")); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); @@ -297,6 +300,8 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture()); + successCaptor.getValue().run(); verify(ctx).tellNext(any(), eq("Cleared")); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); @@ -345,6 +350,8 @@ public class TbAlarmNodeTest { node.onMsg(ctx, msg); + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture()); + successCaptor.getValue().run(); verify(ctx).tellNext(any(), eq("Cleared")); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); diff --git a/ui/src/app/admin/general-settings.tpl.html b/ui/src/app/admin/general-settings.tpl.html index b6eea094bf..8a97f2300a 100644 --- a/ui/src/app/admin/general-settings.tpl.html +++ b/ui/src/app/admin/general-settings.tpl.html @@ -34,6 +34,14 @@
admin.base-url-required
+ + {{ 'admin.prohibit-different-url' | translate }} + +
+ admin.prohibit-different-url-hint +
{{'action.save' | translate}}
diff --git a/ui/src/app/locale/locale.constant-en_US.json b/ui/src/app/locale/locale.constant-en_US.json index 412280603c..d46f3d78d8 100644 --- a/ui/src/app/locale/locale.constant-en_US.json +++ b/ui/src/app/locale/locale.constant-en_US.json @@ -74,6 +74,8 @@ "test-mail-sent": "Test mail was successfully sent!", "base-url": "Base URL", "base-url-required": "Base URL is required.", + "prohibit-different-url": "Prohibit to use hostname from the client request headers", + "prohibit-different-url-hint": "This setting should be enabled for production environments. May cause security issues when disabled", "mail-from": "Mail From", "mail-from-required": "Mail From is required.", "smtp-protocol": "SMTP protocol",