diff --git a/application/src/main/data/json/demo/dashboards/rule_engine_statistics.json b/application/src/main/data/json/demo/dashboards/rule_engine_statistics.json index 0cfcebef5d..d167e4087c 100644 --- a/application/src/main/data/json/demo/dashboards/rule_engine_statistics.json +++ b/application/src/main/data/json/demo/dashboards/rule_engine_statistics.json @@ -564,6 +564,7 @@ }, "tooltipDateColor": "rgba(0, 0, 0, 0.76)", "tooltipDateInterval": true, + "tooltipHideZeroFalse": true, "tooltipBackgroundColor": "rgba(255, 255, 255, 0.76)", "tooltipBackgroundBlur": 4, "animation": { @@ -977,6 +978,7 @@ }, "tooltipDateColor": "rgba(0, 0, 0, 0.76)", "tooltipDateInterval": true, + "tooltipHideZeroFalse": true, "tooltipBackgroundColor": "rgba(255, 255, 255, 0.76)", "tooltipBackgroundBlur": 4, "animation": { diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 2443037ca0..8daee6800b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -199,6 +199,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -420,7 +421,7 @@ public abstract class BaseController { return handleException(exception, true); } - private ThingsboardException handleException(Exception exception, boolean logException) { + private ThingsboardException handleException(Throwable exception, boolean logException) { if (logException && logControllerErrorStackTrace) { try { SecurityUser user = getCurrentUser(); @@ -431,6 +432,9 @@ public abstract class BaseController { } Throwable cause = exception.getCause(); + if (exception instanceof ExecutionException) { + exception = cause; + } if (exception instanceof ThingsboardException) { return (ThingsboardException) exception; } else if (exception instanceof IllegalArgumentException || exception instanceof IncorrectParameterException diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 73da3694f9..5c95a0e98f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -69,7 +69,9 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -193,6 +195,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); + this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); log.info("Edge RPC service initialized!"); } @@ -262,6 +265,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Override public void updateEdge(TenantId tenantId, Edge edge) { + if (edge == null) { + log.warn("[{}] Edge is null - edge is removed and outdated notification is in process!", tenantId); + return; + } EdgeGrpcSession session = sessions.get(edge.getId()); if (session != null && session.isConnected()) { log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); @@ -459,11 +466,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void processEdgeEventMigrationIfNeeded(EdgeGrpcSession session, EdgeId edgeId) throws Exception { boolean isMigrationProcessed = edgeEventsMigrationProcessed.getOrDefault(edgeId, Boolean.FALSE); if (!isMigrationProcessed) { + log.info("Starting edge event migration for edge [{}]", edgeId.getId()); Boolean eventsExist = session.migrateEdgeEvents().get(); if (Boolean.TRUE.equals(eventsExist)) { + log.info("Migration still in progress for edge [{}]", edgeId.getId()); sessionNewEvents.put(edgeId, true); scheduleEdgeEventsCheck(session); } else if (Boolean.FALSE.equals(eventsExist)) { + log.info("Migration completed for edge [{}]", edgeId.getId()); edgeEventsMigrationProcessed.put(edgeId, true); } } @@ -610,4 +620,27 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } } + private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { + try { + List toRemove = new ArrayList<>(); + for (EdgeGrpcSession session : sessions.values()) { + if (session instanceof KafkaEdgeGrpcSession kafkaSession && + !kafkaSession.isConnected() && + kafkaSession.getConsumer() != null && + kafkaSession.getConsumer().getConsumer() != null && + !kafkaSession.getConsumer().getConsumer().isStopped()) { + toRemove.add(kafkaSession.getEdge().getId()); + } + } + for (EdgeId edgeId : toRemove) { + log.info("[{}] Destroying session for edge because edge is not connected", edgeId); + EdgeGrpcSession removed = sessions.remove(edgeId); + if (removed instanceof KafkaEdgeGrpcSession kafkaSession) { + kafkaSession.destroy(); + } + } + } catch (Exception e) { + log.warn("Failed to cleanup kafka sessions", e); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 8922fa6008..b6ecd848ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -292,11 +292,11 @@ public abstract class EdgeGrpcSession implements Closeable { protected void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) { try { - log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); + log.trace("[{}] Start processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink); processHighPriorityEvents(); PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); if (isConnected() && !pageData.getData().isEmpty()) { - log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, sessionId, pageData.getData().size()); + log.trace("[{}][{}][{}] event(s) are going to be processed.", tenantId, edge.getId(), pageData.getData().size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { @Override @@ -323,16 +323,16 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onFailure(Throwable t) { - log.error("[{}] Failed to send downlink msgs pack", sessionId, t); + log.error("[{}] Failed to send downlink msgs pack", edge.getId(), t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", sessionId, fetcher.getClass().getSimpleName(), pageLink); + log.trace("[{}] no event(s) found. Stop processing edge events, fetcher = {}, pageLink = {}", edge.getId(), fetcher.getClass().getSimpleName(), pageLink); result.set(null); } } catch (Exception e) { - log.error("[{}] Failed to fetch edge events", sessionId, e); + log.error("[{}] Failed to fetch edge events", edge.getId(), e); result.setException(e); } } @@ -459,9 +459,9 @@ public abstract class EdgeGrpcSession implements Closeable { ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId) .edgeId(edge.getId()).customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(error).build()); } - log.warn("[{}][{}] {}, attempt: {}", tenantId, sessionId, failureMsg, attempt); + log.warn("[{}][{}] {}, attempt: {}", tenantId, edge.getId(), failureMsg, attempt); } - log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, sessionId, copy.size()); + log.trace("[{}][{}][{}] downlink msg(s) are going to be send.", tenantId, edge.getId(), copy.size()); for (DownlinkMsg downlinkMsg : copy) { if (clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > clientMaxInboundMessageSize) { String error = String.format("Client max inbound message size %s is exceeded. Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE " + @@ -483,7 +483,7 @@ public abstract class EdgeGrpcSession implements Closeable { } else { String failureMsg = String.format("Failed to deliver messages: %s", copy); log.warn("[{}][{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", - tenantId, sessionId, MAX_DOWNLINK_ATTEMPTS, copy); + tenantId, edge.getId(), MAX_DOWNLINK_ATTEMPTS, copy); ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg) .error("Failed to deliver messages after " + MAX_DOWNLINK_ATTEMPTS + " attempts").build()); @@ -493,7 +493,7 @@ public abstract class EdgeGrpcSession implements Closeable { stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { - log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, sessionId, e.getMessage(), e); + log.warn("[{}][{}] Failed to send downlink msgs. Error msg {}", tenantId, edge.getId(), e.getMessage(), e); stopCurrentSendDownlinkMsgsTask(true); } }; @@ -540,7 +540,7 @@ public abstract class EdgeGrpcSession implements Closeable { stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { - log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, sessionId, msg, e); + log.error("[{}][{}] Can't process downlink response message [{}]", tenantId, edge.getId(), msg, e); } } @@ -555,12 +555,12 @@ public abstract class EdgeGrpcSession implements Closeable { while ((event = highPriorityQueue.poll()) != null) { highPriorityEvents.add(event); } - log.trace("[{}][{}] Sending high priority events {}", tenantId, sessionId, highPriorityEvents.size()); + log.trace("[{}][{}] Sending high priority events {}", tenantId, edge.getId(), highPriorityEvents.size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(highPriorityEvents); sendDownlinkMsgsPack(downlinkMsgsPack).get(); } } catch (Exception e) { - log.error("[{}] Failed to process high priority events", sessionId, e); + log.error("[{}] Failed to process high priority events", edge.getId(), e); } } @@ -577,7 +577,7 @@ public abstract class EdgeGrpcSession implements Closeable { Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), ctx.getEdgeEventService()); log.trace("[{}][{}] starting processing edge events, previousStartTs = {}, previousStartSeqId = {}", - tenantId, sessionId, previousStartTs, previousStartSeqId); + tenantId, edge.getId(), previousStartTs, previousStartSeqId); Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { @Override public void onSuccess(@Nullable Pair newStartTsAndSeqId) { @@ -586,7 +586,7 @@ public abstract class EdgeGrpcSession implements Closeable { Futures.addCallback(updateFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable AttributesSaveResult saveResult) { - log.debug("[{}][{}] queue offset was updated [{}]", tenantId, sessionId, newStartTsAndSeqId); + log.debug("[{}][{}] queue offset was updated [{}]", tenantId, edge.getId(), newStartTsAndSeqId); boolean newEventsAvailable; if (fetcher.isSeqIdNewCycleStarted()) { newEventsAvailable = isNewEdgeEventsAvailable(); @@ -601,28 +601,28 @@ public abstract class EdgeGrpcSession implements Closeable { @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, sessionId, newStartTsAndSeqId, t); + log.error("[{}][{}] Failed to update queue offset [{}]", tenantId, edge.getId(), newStartTsAndSeqId, t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, sessionId); + log.trace("[{}][{}] newStartTsAndSeqId is null. Skipping iteration without db update", tenantId, edge.getId()); result.set(Boolean.FALSE); } } @Override public void onFailure(Throwable t) { - log.error("[{}][{}] Failed to process events", tenantId, sessionId, t); + log.error("[{}][{}] Failed to process events", tenantId, edge.getId(), t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { if (isSyncInProgress()) { - log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, sessionId); + log.trace("[{}][{}] edge sync is not completed yet. Skipping iteration", tenantId, edge.getId()); result.set(Boolean.TRUE); } else { - log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, sessionId); + log.trace("[{}][{}] edge is not connected. Skipping iteration", tenantId, edge.getId()); result.set(null); } } @@ -632,7 +632,7 @@ public abstract class EdgeGrpcSession implements Closeable { protected List convertToDownlinkMsgsPack(List edgeEvents) { List result = new ArrayList<>(); for (EdgeEvent edgeEvent : edgeEvents) { - log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, sessionId, edgeEvent); + log.trace("[{}][{}] converting edge event to downlink msg [{}]", tenantId, edge.getId(), edgeEvent); DownlinkMsg downlinkMsg = null; try { switch (edgeEvent.getAction()) { @@ -641,16 +641,16 @@ public abstract class EdgeGrpcSession implements Closeable { ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> { downlinkMsg = convertEntityEventToDownlink(edgeEvent); if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) { - log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, sessionId, downlinkMsg.getDownlinkMsgId()); + log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", tenantId, edge.getId(), downlinkMsg.getDownlinkMsgId()); } else { - log.trace("[{}][{}] entity message processed [{}]", tenantId, sessionId, downlinkMsg); + log.trace("[{}][{}] entity message processed [{}]", tenantId, edge.getId(), downlinkMsg); } } case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED -> downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent); - default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, sessionId, edgeEvent.getAction()); + default -> log.warn("[{}][{}] Unsupported action type [{}]", tenantId, edge.getId(), edgeEvent.getAction()); } } catch (Exception e) { - log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, sessionId, e); + log.trace("[{}][{}] Exception during converting edge event to downlink msg", tenantId, edge.getId(), e); } if (downlinkMsg != null) { result.add(downlinkMsg); @@ -757,19 +757,19 @@ public abstract class EdgeGrpcSession implements Closeable { private void sendDownlinkMsg(ResponseMsg responseMsg) { if (isConnected()) { String responseMsgStr = StringUtils.truncate(responseMsg.toString(), 10000); - log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, sessionId, responseMsgStr); + log.trace("[{}][{}] Sending downlink msg [{}]", tenantId, edge.getId(), responseMsgStr); downlinkMsgLock.lock(); String downlinkMsgStr = responseMsg.hasDownlinkMsg() ? String.valueOf(responseMsg.getDownlinkMsg().getDownlinkMsgId()) : responseMsgStr; try { outputStream.onNext(responseMsg); } catch (Exception e) { - log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, sessionId, downlinkMsgStr, e); + log.trace("[{}][{}] Failed to send downlink message [{}]", tenantId, edge.getId(), downlinkMsgStr, e); connected = false; sessionCloseListener.accept(edge, sessionId); } finally { downlinkMsgLock.unlock(); } - log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, sessionId, downlinkMsgStr); + log.trace("[{}][{}] downlink msg successfully sent [{}]", tenantId, edge.getId(), downlinkMsgStr); } } @@ -909,8 +909,8 @@ public abstract class EdgeGrpcSession implements Closeable { } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); - log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); - ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(edge.getTenantId()).edgeId(edge.getId()) + log.trace("[{}][{}] Can't process uplink msg [{}]", tenantId, edge.getId(), uplinkMsg, e); + ctx.getRuleProcessor().process(EdgeCommunicationFailureTrigger.builder().tenantId(tenantId).edgeId(edge.getId()) .customerId(edge.getCustomerId()).edgeName(edge.getName()).failureMsg(failureMsg).error(e.getMessage()).build()); return Futures.immediateFailedFuture(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index 37687f14a9..daffe9db11 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.edge.rpc; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.edge.Edge; @@ -56,6 +57,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private volatile boolean isHighPriorityProcessing; + @Getter private QueueConsumerManager> consumer; private ExecutorService consumerExecutor; @@ -72,31 +74,28 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { - log.trace("[{}][{}] starting processing edge events", tenantId, sessionId); - if (isConnected() && !isSyncInProgress() && !isHighPriorityProcessing) { - List edgeEvents = new ArrayList<>(); - for (TbProtoQueueMsg msg : msgs) { - EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); - edgeEvents.add(edgeEvent); + log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId()); + if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) { + log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " + + "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", + tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing); + return; + } + List edgeEvents = new ArrayList<>(); + for (TbProtoQueueMsg msg : msgs) { + EdgeEvent edgeEvent = ProtoUtils.fromProto(msg.getValue().getEdgeEventMsg()); + edgeEvents.add(edgeEvent); + } + List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); + try { + boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); + if (isInterrupted) { + log.debug("[{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId()); + } else { + consumer.commit(); } - List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); - try { - boolean isInterrupted = sendDownlinkMsgsPack(downlinkMsgsPack).get(); - if (isInterrupted) { - log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId); - } else { - consumer.commit(); - } - } catch (Exception e) { - log.error("[{}] Failed to process all downlink messages", sessionId, e); - } - } else { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()); - } catch (InterruptedException interruptedException) { - log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException); - } - log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", tenantId, sessionId); + } catch (Exception e) { + log.error("[{}][{}] Failed to process downlink messages", tenantId, edge.getId(), e); } } @@ -107,18 +106,23 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public ListenableFuture processEdgeEvents() { - if (consumer == null) { - this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); - this.consumer = QueueConsumerManager.>builder() - .name("TB Edge events") - .msgPackProcessor(this::processMsgs) - .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) - .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) - .consumerExecutor(consumerExecutor) - .threadPrefix("edge-events") - .build(); - consumer.subscribe(); - consumer.launch(); + if (consumer == null || (consumer.getConsumer() != null && consumer.getConsumer().isStopped())) { + try { + this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer")); + this.consumer = QueueConsumerManager.>builder() + .name("TB Edge events [" + edge.getId() + "]") + .msgPackProcessor(this::processMsgs) + .pollInterval(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()) + .consumerCreator(() -> tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edge.getId())) + .consumerExecutor(consumerExecutor) + .threadPrefix("edge-events-" + edge.getId()) + .build(); + consumer.subscribe(); + consumer.launch(); + } catch (Exception e) { + destroy(); + log.warn("[{}][{}] Failed to start edge event consumer", sessionId, edge.getId(), e); + } } return Futures.immediateFuture(Boolean.FALSE); } @@ -132,8 +136,18 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public void destroy() { - consumer.stop(); - consumerExecutor.shutdown(); + try { + if (consumer != null) { + consumer.stop(); + } + } finally { + consumer = null; + } + try { + if (consumerExecutor != null) { + consumerExecutor.shutdown(); + } + } catch (Exception ignored) {} } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java index 62455117e0..d6cee80ebf 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.notification.NotificationRequestStatus import org.thingsboard.server.common.data.notification.info.NotificationInfo; import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; +import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger.DeduplicationStrategy; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -66,8 +67,8 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess private final NotificationDeduplicationService deduplicationService; private final PartitionService partitionService; private final RateLimitService rateLimitService; - @Autowired @Lazy - private NotificationCenter notificationCenter; + @Lazy + private final NotificationCenter notificationCenter; private final NotificationExecutorService notificationExecutor; private final Map triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class); @@ -82,14 +83,11 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess if (enabledRules.isEmpty()) { return; } - if (trigger.deduplicate()) { - enabledRules = new ArrayList<>(enabledRules); - enabledRules.removeIf(rule -> deduplicationService.alreadyProcessed(trigger, rule)); - } - final List rules = enabledRules; - for (NotificationRule rule : rules) { + + List rulesToProcess = filterNotificationRules(trigger, enabledRules); + for (NotificationRule rule : rulesToProcess) { try { - processNotificationRule(rule, trigger); + processNotificationRule(rule, trigger, DeduplicationStrategy.ONLY_MATCHING.equals(trigger.getDeduplicationStrategy())); } catch (Throwable e) { log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e); } @@ -100,7 +98,20 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess }); } - private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) { + private List filterNotificationRules(NotificationRuleTrigger trigger, List enabledRules) { + List rulesToProcess = new ArrayList<>(enabledRules); + rulesToProcess.removeIf(rule -> switch (trigger.getDeduplicationStrategy()) { + case ONLY_MATCHING -> { + boolean matched = matchesFilter(trigger, rule.getTriggerConfig()); + yield !matched || deduplicationService.alreadyProcessed(trigger, rule); + } + case ALL -> deduplicationService.alreadyProcessed(trigger, rule); + default -> false; + }); + return rulesToProcess; + } + + private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger, boolean alreadyMatched) { NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig(); log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType()); @@ -114,7 +125,7 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess return; } - if (matchesFilter(trigger, triggerConfig)) { + if (alreadyMatched || matchesFilter(trigger, triggerConfig)) { if (!rateLimitService.checkRateLimit(LimitedApi.NOTIFICATION_REQUESTS_PER_RULE, rule.getTenantId(), rule.getId())) { log.debug("[{}] Rate limit for notification requests per rule was exceeded (rule '{}')", rule.getTenantId(), rule.getName()); return; diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/ResourcesShortageTriggerProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/ResourcesShortageTriggerProcessor.java index aefb628d2d..09c8d76eca 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/ResourcesShortageTriggerProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/ResourcesShortageTriggerProcessor.java @@ -39,7 +39,12 @@ public class ResourcesShortageTriggerProcessor implements NotificationRuleTrigge @Override public RuleOriginatedNotificationInfo constructNotificationInfo(ResourcesShortageTrigger trigger) { - return ResourcesShortageNotificationInfo.builder().resource(trigger.getResource().name()).usage(trigger.getUsage()).build(); + return ResourcesShortageNotificationInfo.builder() + .resource(trigger.getResource().name()) + .usage(trigger.getUsage()) + .serviceId(trigger.getServiceId()) + .serviceType(trigger.getServiceType()) + .build(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java index adc87957d3..a38c2721c1 100644 --- a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java +++ b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java @@ -185,9 +185,14 @@ public class DefaultSystemInfoService extends TbApplicationEventListener clusterSystemData = getSystemData(serviceInfoProvider.getServiceInfo()); clusterSystemData.forEach(data -> { - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(data.getCpuUsage()).build()); - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(data.getMemoryUsage()).build()); - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(data.getDiscUsage()).build()); + Arrays.stream(Resource.values()).forEach(resource -> { + notificationRuleProcessor.process(ResourcesShortageTrigger.builder() + .resource(resource) + .serviceId(data.getServiceId()) + .serviceType(data.getServiceType()) + .usage(extractResourceUsage(data, resource)) + .build()); + }); }); BasicTsKvEntry clusterDataKv = new BasicTsKvEntry(ts, new JsonDataEntry("clusterSystemData", JacksonUtil.toString(clusterSystemData))); doSave(Arrays.asList(new BasicTsKvEntry(ts, new BooleanDataEntry("clusterMode", true)), clusterDataKv)); @@ -200,17 +205,17 @@ public class DefaultSystemInfoService extends TbApplicationEventListener { long value = (long) v; tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("cpuUsage", value))); - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(value).build()); + notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.CPU).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build()); }); getMemoryUsage().ifPresent(v -> { long value = (long) v; tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("memoryUsage", value))); - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(value).build()); + notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.RAM).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build()); }); getDiscSpaceUsage().ifPresent(v -> { long value = (long) v; tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("discUsage", value))); - notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(value).build()); + notificationRuleProcessor.process(ResourcesShortageTrigger.builder().resource(Resource.STORAGE).usage(value).serviceId(serviceInfoProvider.getServiceId()).serviceType(serviceInfoProvider.getServiceType()).build()); }); getCpuCount().ifPresent(v -> tsList.add(new BasicTsKvEntry(ts, new LongDataEntry("cpuCount", (long) v)))); @@ -258,6 +263,14 @@ public class DefaultSystemInfoService extends TbApplicationEventListener info.getCpuUsage(); + case RAM -> info.getMemoryUsage(); + case STORAGE -> info.getDiscUsage(); + }; + } + @PreDestroy private void destroy() { if (scheduler != null) { diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java index 282e959fcd..bab70ea505 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java @@ -825,6 +825,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { notificationRuleProcessor.process(ResourcesShortageTrigger.builder() .resource(Resource.RAM) .usage(15L) + .serviceType("serviceType") + .serviceId("serviceId") .build()); TimeUnit.MILLISECONDS.sleep(300); } @@ -837,10 +839,48 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { notificationRuleProcessor.process(ResourcesShortageTrigger.builder() .resource(Resource.RAM) .usage(5L) + .serviceType("serviceType") + .serviceId("serviceId") .build()); await("").atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat(getMyNotifications(false, 100)).size().isOne()); } + @Test + public void testNotificationsResourcesShortage_whenThresholdChangeToMatchingFilter_thenSendNotification() throws Exception { + loginSysAdmin(); + ResourcesShortageNotificationRuleTriggerConfig triggerConfig = ResourcesShortageNotificationRuleTriggerConfig.builder() + .ramThreshold(1f) + .cpuThreshold(1f) + .storageThreshold(1f) + .build(); + NotificationRule rule = createNotificationRule(triggerConfig, "Warning: ${resource} shortage", "${resource} shortage", createNotificationTarget(tenantAdminUserId).getId()); + loginTenantAdmin(); + + Method method = DefaultSystemInfoService.class.getDeclaredMethod("saveCurrentMonolithSystemInfo"); + method.setAccessible(true); + method.invoke(systemInfoService); + + TimeUnit.SECONDS.sleep(5); + assertThat(getMyNotifications(false, 100)).size().isZero(); + + loginSysAdmin(); + triggerConfig = ResourcesShortageNotificationRuleTriggerConfig.builder() + .ramThreshold(0.01f) + .cpuThreshold(1f) + .storageThreshold(1f) + .build(); + rule.setTriggerConfig(triggerConfig); + saveNotificationRule(rule); + loginTenantAdmin(); + + method.invoke(systemInfoService); + + await().atMost(10, TimeUnit.SECONDS).until(() -> getMyNotifications(false, 100).size() == 1); + Notification notification = getMyNotifications(false, 100).get(0); + assertThat(notification.getSubject()).isEqualTo("Warning: RAM shortage"); + assertThat(notification.getText()).isEqualTo("RAM shortage"); + } + @Test public void testNotificationRuleDisabling() throws Exception { EntityActionNotificationRuleTriggerConfig triggerConfig = new EntityActionNotificationRuleTriggerConfig(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/SystemInfoData.java b/common/data/src/main/java/org/thingsboard/server/common/data/SystemInfoData.java index c979fbffd1..afbad6e3a2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/SystemInfoData.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/SystemInfoData.java @@ -20,6 +20,7 @@ import lombok.Data; @Data public class SystemInfoData { + @Schema(description = "Service Id.") private String serviceId; @Schema(description = "Service type.") diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java index 3df7fc92fe..1e890da961 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java @@ -20,7 +20,8 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import org.apache.commons.lang3.BooleanUtils; + +import static org.apache.commons.lang3.BooleanUtils.toBooleanDefaultIfNull; @Getter @NoArgsConstructor @@ -34,14 +35,14 @@ public class EdqsState { private EdqsApiMode apiMode; public boolean updateEdqsReady(boolean ready) { - boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready; + boolean changed = toBooleanDefaultIfNull(this.edqsReady, false) != ready; this.edqsReady = ready; return changed; } @JsonIgnore public boolean isApiReady() { - return edqsReady && syncStatus == EdqsSyncStatus.FINISHED; + return toBooleanDefaultIfNull(edqsReady, false) && syncStatus == EdqsSyncStatus.FINISHED; } @JsonIgnore diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/info/ResourcesShortageNotificationInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/info/ResourcesShortageNotificationInfo.java index 24cb21febd..c05a10ef8f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/info/ResourcesShortageNotificationInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/info/ResourcesShortageNotificationInfo.java @@ -30,12 +30,16 @@ public class ResourcesShortageNotificationInfo implements RuleOriginatedNotifica private String resource; private Long usage; + private String serviceId; + private String serviceType; @Override public Map getTemplateData() { return Map.of( "resource", resource, - "usage", String.valueOf(usage) + "usage", String.valueOf(usage), + "serviceId", serviceId, + "serviceType", serviceType ); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeCommunicationFailureTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeCommunicationFailureTrigger.java index 4124eb04f8..5672b2c98c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeCommunicationFailureTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeCommunicationFailureTrigger.java @@ -23,12 +23,16 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; +import java.io.Serial; import java.util.concurrent.TimeUnit; @Data @Builder public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger { + @Serial + private static final long serialVersionUID = 2918443863787603524L; + private final TenantId tenantId; private final CustomerId customerId; private final EdgeId edgeId; @@ -37,8 +41,8 @@ public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger private final String error; @Override - public boolean deduplicate() { - return true; + public DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.ALL; } @Override @@ -60,4 +64,5 @@ public class EdgeCommunicationFailureTrigger implements NotificationRuleTrigger public EntityId getOriginatorEntityId() { return edgeId; } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeConnectionTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeConnectionTrigger.java index fc3b69e697..0da465ec09 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeConnectionTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/EdgeConnectionTrigger.java @@ -23,12 +23,16 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; +import java.io.Serial; import java.util.concurrent.TimeUnit; @Data @Builder public class EdgeConnectionTrigger implements NotificationRuleTrigger { + @Serial + private static final long serialVersionUID = -261939829962721957L; + private final TenantId tenantId; private final CustomerId customerId; private final EdgeId edgeId; @@ -36,8 +40,8 @@ public class EdgeConnectionTrigger implements NotificationRuleTrigger { private final String edgeName; @Override - public boolean deduplicate() { - return true; + public DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.ALL; } @Override @@ -59,4 +63,5 @@ public class EdgeConnectionTrigger implements NotificationRuleTrigger { public EntityId getOriginatorEntityId() { return edgeId; } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NewPlatformVersionTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NewPlatformVersionTrigger.java index 204ae15e57..50ee0768d9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NewPlatformVersionTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NewPlatformVersionTrigger.java @@ -22,10 +22,15 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; +import java.io.Serial; + @Data @Builder public class NewPlatformVersionTrigger implements NotificationRuleTrigger { + @Serial + private static final long serialVersionUID = 3298785969736390092L; + private final UpdateMessage updateInfo; @Override @@ -45,8 +50,8 @@ public class NewPlatformVersionTrigger implements NotificationRuleTrigger { @Override - public boolean deduplicate() { - return true; + public DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.ALL; } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTrigger.java index 31940d6ac8..f6cf398b44 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/NotificationRuleTrigger.java @@ -29,9 +29,8 @@ public interface NotificationRuleTrigger extends Serializable { EntityId getOriginatorEntityId(); - - default boolean deduplicate() { - return false; + default DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.NONE; } default String getDeduplicationKey() { @@ -43,4 +42,10 @@ public interface NotificationRuleTrigger extends Serializable { return 0; } + enum DeduplicationStrategy { + NONE, + ALL, + ONLY_MATCHING + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/RateLimitsTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/RateLimitsTrigger.java index 39d570a9e0..37e984c6f5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/RateLimitsTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/RateLimitsTrigger.java @@ -22,12 +22,16 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; +import java.io.Serial; import java.util.concurrent.TimeUnit; @Data @Builder public class RateLimitsTrigger implements NotificationRuleTrigger { + @Serial + private static final long serialVersionUID = -4423112145409424886L; + private final TenantId tenantId; private final LimitedApi api; private final EntityId limitLevel; @@ -45,8 +49,8 @@ public class RateLimitsTrigger implements NotificationRuleTrigger { @Override - public boolean deduplicate() { - return true; + public DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.ALL; } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/ResourcesShortageTrigger.java b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/ResourcesShortageTrigger.java index f12c80d5db..be2485c959 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/ResourcesShortageTrigger.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/notification/rule/trigger/ResourcesShortageTrigger.java @@ -33,6 +33,8 @@ public class ResourcesShortageTrigger implements NotificationRuleTrigger { private Resource resource; private Long usage; + private String serviceId; + private String serviceType; @Override public TenantId getTenantId() { @@ -45,13 +47,13 @@ public class ResourcesShortageTrigger implements NotificationRuleTrigger { } @Override - public boolean deduplicate() { - return true; + public DeduplicationStrategy getDeduplicationStrategy() { + return DeduplicationStrategy.ONLY_MATCHING; } @Override public String getDeduplicationKey() { - return resource.name(); + return String.join(":", resource.name(), serviceId, serviceType); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java index 194f0ce962..a410ac5d04 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java @@ -22,6 +22,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.JavaSerDesUtil; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; +import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger.DeduplicationStrategy; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -47,7 +48,7 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso @Override public void process(NotificationRuleTrigger trigger) { try { - if (trigger.deduplicate() && deduplicationService.alreadyProcessed(trigger)) { + if (!DeduplicationStrategy.NONE.equals(trigger.getDeduplicationStrategy()) && deduplicationService.alreadyProcessed(trigger)) { return; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java index efd69a4e61..7cee3d04ae 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java +++ b/dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotifications.java @@ -376,7 +376,7 @@ public class DefaultNotifications { public static final DefaultNotification resourcesShortage = DefaultNotification.builder() .name("Resources shortage notification") .type(NotificationType.RESOURCES_SHORTAGE) - .subject("Warning: ${resource} shortage") + .subject("Warning: ${resource} shortage for ${serviceId}") .text("${resource} usage is at ${usage}%.") .icon("warning") .rule(DefaultRule.builder() diff --git a/ui-ngx/src/app/core/http/ota-package.service.ts b/ui-ngx/src/app/core/http/ota-package.service.ts index 09f887c038..0ddbfc44de 100644 --- a/ui-ngx/src/app/core/http/ota-package.service.ts +++ b/ui-ngx/src/app/core/http/ota-package.service.ts @@ -129,15 +129,16 @@ export class OtaPackageService { } return forkJoin(tasks).pipe( mergeMap(([deviceFirmwareUpdate, deviceSoftwareUpdate]) => { - let text = ''; + const lines: string[] = []; if (deviceFirmwareUpdate > 0) { - text += this.translate.instant('ota-update.change-firmware', {count: deviceFirmwareUpdate}); + lines.push(this.translate.instant('ota-update.change-firmware', {count: deviceFirmwareUpdate})); } if (deviceSoftwareUpdate > 0) { - text += text.length ? ' ' : ''; - text += this.translate.instant('ota-update.change-software', {count: deviceSoftwareUpdate}); + lines.push(this.translate.instant('ota-update.change-software', {count: deviceSoftwareUpdate})); } - return text !== '' ? this.dialogService.confirm('', text, null, this.translate.instant('common.proceed')) : of(true); + return lines.length + ? this.dialogService.confirm(this.translate.instant('ota-update.change-ota-setting-title'), lines.join('
'), null, this.translate.instant('common.proceed')) + : of(true); }) ); } diff --git a/ui-ngx/src/app/modules/home/components/dashboard-page/edit-widget.component.scss b/ui-ngx/src/app/modules/home/components/dashboard-page/edit-widget.component.scss index 69cbeb7f5f..2ff02a1f16 100644 --- a/ui-ngx/src/app/modules/home/components/dashboard-page/edit-widget.component.scss +++ b/ui-ngx/src/app/modules/home/components/dashboard-page/edit-widget.component.scss @@ -21,7 +21,7 @@ right: 0; bottom: 0; background: #fff; - z-index: 5; + z-index: 100; } .widget-preview-section { position: absolute; diff --git a/ui-ngx/src/app/modules/home/components/dashboard-page/states/manage-dashboard-states-dialog.component.ts b/ui-ngx/src/app/modules/home/components/dashboard-page/states/manage-dashboard-states-dialog.component.ts index 511abff8e5..107d5c2686 100644 --- a/ui-ngx/src/app/modules/home/components/dashboard-page/states/manage-dashboard-states-dialog.component.ts +++ b/ui-ngx/src/app/modules/home/components/dashboard-page/states/manage-dashboard-states-dialog.component.ts @@ -14,7 +14,16 @@ /// limitations under the License. /// -import { AfterViewInit, Component, ElementRef, Inject, OnInit, SkipSelf, ViewChild } from '@angular/core'; +import { + AfterViewInit, + Component, + ElementRef, + Inject, + OnInit, + SecurityContext, + SkipSelf, + ViewChild +} from '@angular/core'; import { ErrorStateMatcher } from '@angular/material/core'; import { MAT_DIALOG_DATA, MatDialog, MatDialogRef } from '@angular/material/dialog'; import { Store } from '@ngrx/store'; @@ -42,6 +51,7 @@ import { } from '@home/components/dashboard-page/states/dashboard-state-dialog.component'; import { UtilsService } from '@core/services/utils.service'; import { Widget } from '@shared/models/widget.models'; +import { DomSanitizer } from '@angular/platform-browser'; export interface ManageDashboardStatesDialogData { states: {[id: string]: DashboardState }; @@ -87,7 +97,8 @@ export class ManageDashboardStatesDialogComponent private translate: TranslateService, private dialogs: DialogService, private utils: UtilsService, - private dialog: MatDialog) { + private dialog: MatDialog, + private sanitizer: DomSanitizer) { super(store, router, dialogRef); this.states = this.data.states; @@ -148,7 +159,8 @@ export class ManageDashboardStatesDialogComponent } const title = this.translate.instant('dashboard.delete-state-title'); const content = this.translate.instant('dashboard.delete-state-text', {stateName: state.name}); - this.dialogs.confirm(title, content, this.translate.instant('action.no'), + const safeContent = this.sanitizer.sanitize(SecurityContext.HTML, content); + this.dialogs.confirm(title, safeContent, this.translate.instant('action.no'), this.translate.instant('action.yes')).subscribe( (res) => { if (res) { diff --git a/ui-ngx/src/app/modules/home/components/profile/device/device-profile-transport-configuration.component.ts b/ui-ngx/src/app/modules/home/components/profile/device/device-profile-transport-configuration.component.ts index 1a696072eb..951042bb2d 100644 --- a/ui-ngx/src/app/modules/home/components/profile/device/device-profile-transport-configuration.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/device/device-profile-transport-configuration.component.ts @@ -103,7 +103,7 @@ export class DeviceProfileTransportConfigurationComponent implements ControlValu delete configuration.type; } setTimeout(() => { - this.deviceProfileTransportConfigurationFormGroup.patchValue({configuration}, {emitEvent: false}); + this.deviceProfileTransportConfigurationFormGroup.patchValue({configuration}, {emitEvent: this.isAdd}); }, 0); } diff --git a/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-instances.component.scss b/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-instances.component.scss index d86864d1f3..9ce049107b 100644 --- a/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-instances.component.scss +++ b/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-instances.component.scss @@ -31,4 +31,10 @@ .mat-expansion-panel-header-title { margin-right: 0; } + + &::ng-deep { + .mat-content.mat-content-hide-toggle { + margin-right: 0; + } + } } diff --git a/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-resources.component.html b/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-resources.component.html index 06b451a1f8..63a5dfbd1f 100644 --- a/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-resources.component.html +++ b/ui-ngx/src/app/modules/home/components/profile/device/lwm2m/lwm2m-observe-attr-telemetry-resources.component.html @@ -49,7 +49,7 @@
- diff --git a/ui-ngx/src/app/modules/home/components/widget/action/widget-action-dialog.component.ts b/ui-ngx/src/app/modules/home/components/widget/action/widget-action-dialog.component.ts index 3eabf71899..bfe14832c4 100644 --- a/ui-ngx/src/app/modules/home/components/widget/action/widget-action-dialog.component.ts +++ b/ui-ngx/src/app/modules/home/components/widget/action/widget-action-dialog.component.ts @@ -146,6 +146,7 @@ export class WidgetActionDialogComponent extends DialogComponent item.visible); this.showLegend = this.settings.showLegend && !!this.rangeItems.length; diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/chart/range-chart-widget.models.ts b/ui-ngx/src/app/modules/home/components/widget/lib/chart/range-chart-widget.models.ts index 7134a924fe..f4210e2203 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/chart/range-chart-widget.models.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/chart/range-chart-widget.models.ts @@ -22,6 +22,7 @@ import { Font, simpleDateFormat, sortedColorRange, + ValueFormatProcessor, ValueSourceType } from '@shared/models/widget-settings.models'; import { LegendPosition } from '@shared/models/widget.models'; @@ -291,21 +292,21 @@ export const rangeChartTimeSeriesKeySettings = (settings: RangeChartWidgetSettin } }); -export const toRangeItems = (colorRanges: Array, convertValue: (x: number) => number): RangeItem[] => { +export const toRangeItems = (colorRanges: Array, valueFormat: ValueFormatProcessor): RangeItem[] => { const rangeItems: RangeItem[] = []; let counter = 0; const ranges = sortedColorRange(filterIncludingColorRanges(colorRanges)).filter(r => isNumber(r.from) || isNumber(r.to)); for (let i = 0; i < ranges.length; i++) { const range = ranges[i]; let from = range.from; - const to = isDefinedAndNotNull(range.to) ? convertValue(range.to) : range.to; + const to = isDefinedAndNotNull(range.to) ? Number(valueFormat.format(range.to)) : range.to; if (i > 0) { const prevRange = ranges[i - 1]; if (isNumber(prevRange.to) && isNumber(from) && from < prevRange.to) { from = prevRange.to; } } - from = isDefinedAndNotNull(from) ? convertValue(from) : from; + from = isDefinedAndNotNull(from) ? Number(valueFormat.format(from)) : from; rangeItems.push( { index: counter++, diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/chart/time-series-chart-tooltip.models.ts b/ui-ngx/src/app/modules/home/components/widget/lib/chart/time-series-chart-tooltip.models.ts index 713e7b9373..22c16c0219 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/chart/time-series-chart-tooltip.models.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/chart/time-series-chart-tooltip.models.ts @@ -17,9 +17,7 @@ import { isFunction } from '@core/utils'; import { FormattedData } from '@shared/models/widget.models'; import { DateFormatProcessor, DateFormatSettings, Font } from '@shared/models/widget-settings.models'; -import { - TimeSeriesChartDataItem, -} from '@home/components/widget/lib/chart/time-series-chart.models'; +import { TimeSeriesChartDataItem } from '@home/components/widget/lib/chart/time-series-chart.models'; import { Renderer2, SecurityContext } from '@angular/core'; import { DomSanitizer } from '@angular/platform-browser'; import { CallbackDataParams } from 'echarts/types/dist/shared'; @@ -104,6 +102,9 @@ export class TimeSeriesChartTooltip { if (!tooltipParams.items.length && !tooltipParams.comparisonItems.length) { return null; } + if (this.settings.tooltipHideZeroFalse && !tooltipParams.items.some(value => value.param.value[1] && value.param.value[1] !== 'false')) { + return undefined; + } const tooltipElement: HTMLElement = this.renderer.createElement('div'); this.renderer.setStyle(tooltipElement, 'display', 'flex'); @@ -130,7 +131,7 @@ export class TimeSeriesChartTooltip { this.renderer.appendChild(tooltipItemsElement, this.constructTooltipDateElement(items[0].param, interval)); } for (const item of items) { - if (!this.settings.tooltipHideZeroFalse || item.param.value[1]) { + if (!this.settings.tooltipHideZeroFalse || (item.param.value[1] && item.param.value[1] !== 'false')) { this.renderer.appendChild(tooltipItemsElement, this.constructTooltipSeriesElement(item)); } } diff --git a/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts b/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts index 81ab5d6bd9..f88be8783b 100644 --- a/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts +++ b/ui-ngx/src/app/modules/home/components/widget/lib/digital-gauge.ts @@ -125,6 +125,7 @@ export class TbCanvasDigitalGauge { this.barColorProcessor = ColorProcessor.fromSettings(settings.barColor, this.ctx); this.valueFormat = ValueFormatProcessor.fromSettings(this.ctx.$injector, { units: this.localSettings.units, + decimals: this.localSettings.decimals, ignoreUnitSymbol: true }); diff --git a/ui-ngx/src/app/modules/home/pages/device-profile/device-profile-tabs.component.ts b/ui-ngx/src/app/modules/home/pages/device-profile/device-profile-tabs.component.ts index c7d1ddc9d0..1e4b735ff7 100644 --- a/ui-ngx/src/app/modules/home/pages/device-profile/device-profile-tabs.component.ts +++ b/ui-ngx/src/app/modules/home/pages/device-profile/device-profile-tabs.component.ts @@ -14,7 +14,7 @@ /// limitations under the License. /// -import { Component, DestroyRef } from '@angular/core'; +import { Component, DestroyRef, OnInit } from '@angular/core'; import { Store } from '@ngrx/store'; import { AppState } from '@core/core.state'; import { EntityTabsComponent } from '../../components/entity/entity-tabs.component'; @@ -31,7 +31,7 @@ import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; templateUrl: './device-profile-tabs.component.html', styleUrls: [] }) -export class DeviceProfileTabsComponent extends EntityTabsComponent { +export class DeviceProfileTabsComponent extends EntityTabsComponent implements OnInit { deviceTransportTypes = Object.values(DeviceTransportType); @@ -55,4 +55,9 @@ export class DeviceProfileTabsComponent extends EntityTabsComponent {{ 'notification.link-required' | translate }} + + {{ 'notification.link-max-length' | translate : + {length: actionButtonConfigForm.get('link').getError('maxlength').requiredLength} + }} + {{ 'notification.subject-required' | translate }} + + {{'notification.subject-max-length' | translate : + {length: templateConfigurationForm.get('WEB.subject').getError('maxlength').requiredLength} + }} + notification.message @@ -56,6 +61,11 @@ {{ 'notification.message-required' | translate }} + + {{ 'notification.message-max-length' | translate : + {length: templateConfigurationForm.get('WEB.body').getError('maxlength').requiredLength} + }} +
@@ -194,6 +204,11 @@ {{ 'notification.subject-required' | translate }} + + {{'notification.subject-max-length' | translate : + {length: templateConfigurationForm.get('EMAIL.subject').getError('maxlength').requiredLength} + }} +