Merge branch 'develop/3.5' of github.com:thingsboard/thingsboard into fix/vulnerabilities
This commit is contained in:
commit
3a9cce9bd6
@ -107,7 +107,7 @@ CREATE TABLE IF NOT EXISTS notification_template (
|
||||
tenant_id UUID NOT NULL,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
notification_type VARCHAR(50) NOT NULL,
|
||||
configuration VARCHAR(10000) NOT NULL,
|
||||
configuration VARCHAR(10000000) NOT NULL,
|
||||
CONSTRAINT uq_notification_template_name UNIQUE (tenant_id, name)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_notification_template_tenant_id_created_time ON notification_template(tenant_id, created_time DESC);
|
||||
@ -132,7 +132,7 @@ CREATE TABLE IF NOT EXISTS notification_request (
|
||||
tenant_id UUID NOT NULL,
|
||||
targets VARCHAR(10000) NOT NULL,
|
||||
template_id UUID,
|
||||
template VARCHAR(10000),
|
||||
template VARCHAR(10000000),
|
||||
info VARCHAR(1000),
|
||||
additional_config VARCHAR(1000),
|
||||
originator_entity_id UUID,
|
||||
|
||||
@ -67,7 +67,11 @@ public class NotificationRuleController extends BaseController {
|
||||
throw new IllegalArgumentException("Trigger type " + triggerType + " is not available");
|
||||
}
|
||||
|
||||
return doSaveAndLog(EntityType.NOTIFICATION_RULE, notificationRule, notificationRuleService::saveNotificationRule);
|
||||
boolean created = notificationRule.getId() == null;
|
||||
notificationRule = doSaveAndLog(EntityType.NOTIFICATION_RULE, notificationRule, notificationRuleService::saveNotificationRule);
|
||||
tbClusterService.broadcastEntityStateChangeEvent(user.getTenantId(), notificationRule.getId(), created ?
|
||||
ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
return notificationRule;
|
||||
}
|
||||
|
||||
@GetMapping("/rule/{id}")
|
||||
|
||||
@ -174,7 +174,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
||||
|
||||
@Override
|
||||
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
|
||||
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream();
|
||||
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService, this.maxInboundMessageSize).getInputStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -105,16 +105,20 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
|
||||
private EdgeVersion edgeVersion;
|
||||
|
||||
private int maxInboundMessageSize;
|
||||
private int clientMaxInboundMessageSize;
|
||||
|
||||
private ScheduledExecutorService sendDownlinkExecutorService;
|
||||
|
||||
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
|
||||
Consumer<EdgeId> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService) {
|
||||
Consumer<EdgeId> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize) {
|
||||
this.sessionId = UUID.randomUUID();
|
||||
this.ctx = ctx;
|
||||
this.outputStream = outputStream;
|
||||
this.sessionOpenListener = sessionOpenListener;
|
||||
this.sessionCloseListener = sessionCloseListener;
|
||||
this.sendDownlinkExecutorService = sendDownlinkExecutorService;
|
||||
this.maxInboundMessageSize = maxInboundMessageSize;
|
||||
initInputStream();
|
||||
}
|
||||
|
||||
@ -130,6 +134,10 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
|
||||
outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
|
||||
} else {
|
||||
if (requestMsg.getConnectRequestMsg().hasMaxInboundMessageSize()) {
|
||||
log.debug("[{}] Client max inbound message size: {}", sessionId, requestMsg.getConnectRequestMsg().getMaxInboundMessageSize());
|
||||
clientMaxInboundMessageSize = requestMsg.getConnectRequestMsg().getMaxInboundMessageSize();
|
||||
}
|
||||
connected = true;
|
||||
}
|
||||
}
|
||||
@ -408,9 +416,17 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
}
|
||||
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, copy.size());
|
||||
for (DownlinkMsg downlinkMsg : copy) {
|
||||
sendDownlinkMsg(ResponseMsg.newBuilder()
|
||||
.setDownlinkMsg(downlinkMsg)
|
||||
.build());
|
||||
if (this.clientMaxInboundMessageSize != 0 && downlinkMsg.getSerializedSize() > this.clientMaxInboundMessageSize) {
|
||||
log.error("[{}][{}][{}] Downlink msg size [{}] exceeds client max inbound message size [{}]. Skipping this message. " +
|
||||
"Please increase value of CLOUD_RPC_MAX_INBOUND_MESSAGE_SIZE env variable on the edge and restart it." +
|
||||
"Message {}", edge.getTenantId(), edge.getId(), this.sessionId, downlinkMsg.getSerializedSize(),
|
||||
this.clientMaxInboundMessageSize, downlinkMsg);
|
||||
sessionState.getPendingMsgsMap().remove(downlinkMsg.getDownlinkMsgId());
|
||||
} else {
|
||||
sendDownlinkMsg(ResponseMsg.newBuilder()
|
||||
.setDownlinkMsg(downlinkMsg)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
if (attempt < MAX_DOWNLINK_ATTEMPTS) {
|
||||
scheduleDownlinkMsgsPackSend(attempt + 1);
|
||||
@ -638,7 +654,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
return ConnectResponseMsg.newBuilder()
|
||||
.setResponseCode(ConnectResponseCode.ACCEPTED)
|
||||
.setErrorMsg("")
|
||||
.setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge)).build();
|
||||
.setConfiguration(ctx.getEdgeMsgConstructor().constructEdgeConfiguration(edge))
|
||||
.setMaxInboundMessageSize(maxInboundMessageSize)
|
||||
.build();
|
||||
}
|
||||
return ConnectResponseMsg.newBuilder()
|
||||
.setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
|
||||
|
||||
@ -35,12 +35,14 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
|
||||
import org.thingsboard.server.dao.notification.NotificationRuleService;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.service.notification.rule.cache.NotificationRulesCache;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.service.executors.NotificationExecutorService;
|
||||
import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor;
|
||||
import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor;
|
||||
@ -60,8 +62,9 @@ import java.util.stream.Collectors;
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public class DefaultNotificationRuleProcessor implements NotificationRuleProcessor {
|
||||
|
||||
private final NotificationRuleService notificationRuleService;
|
||||
private final NotificationRulesCache notificationRulesCache;
|
||||
private final NotificationRequestService notificationRequestService;
|
||||
private final PartitionService partitionService;
|
||||
@Autowired @Lazy
|
||||
private NotificationCenter notificationCenter;
|
||||
private final NotificationExecutorService notificationExecutor;
|
||||
@ -74,15 +77,19 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
if (triggerType == null) return;
|
||||
TenantId tenantId = triggerType.isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID;
|
||||
|
||||
List<NotificationRule> rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType(tenantId, triggerType);
|
||||
for (NotificationRule rule : rules) {
|
||||
notificationExecutor.submit(() -> {
|
||||
try {
|
||||
processNotificationRule(rule, trigger);
|
||||
} catch (Throwable e) {
|
||||
log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e);
|
||||
}
|
||||
});
|
||||
try {
|
||||
List<NotificationRule> rules = notificationRulesCache.get(tenantId, triggerType);
|
||||
for (NotificationRule rule : rules) {
|
||||
notificationExecutor.submit(() -> {
|
||||
try {
|
||||
processNotificationRule(rule, trigger);
|
||||
} catch (Throwable e) {
|
||||
log.error("Failed to process notification rule {} for trigger type {} with trigger object {}", rule.getId(), rule.getTriggerType(), trigger, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("Failed to process notification rules for trigger: {}", trigger, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -169,12 +176,14 @@ public class DefaultNotificationRuleProcessor implements NotificationRuleProcess
|
||||
|
||||
TenantId tenantId = componentLifecycleMsg.getTenantId();
|
||||
NotificationRuleId notificationRuleId = (NotificationRuleId) componentLifecycleMsg.getEntityId();
|
||||
notificationExecutor.submit(() -> {
|
||||
List<NotificationRequestId> scheduledForRule = notificationRequestService.findNotificationRequestsIdsByStatusAndRuleId(tenantId, NotificationRequestStatus.SCHEDULED, notificationRuleId);
|
||||
for (NotificationRequestId notificationRequestId : scheduledForRule) {
|
||||
notificationCenter.deleteNotificationRequest(tenantId, notificationRequestId);
|
||||
}
|
||||
});
|
||||
if (partitionService.isMyPartition(ServiceType.TB_CORE, tenantId, notificationRuleId)) {
|
||||
notificationExecutor.submit(() -> {
|
||||
List<NotificationRequestId> scheduledForRule = notificationRequestService.findNotificationRequestsIdsByStatusAndRuleId(tenantId, NotificationRequestStatus.SCHEDULED, notificationRuleId);
|
||||
for (NotificationRequestId notificationRequestId : scheduledForRule) {
|
||||
notificationCenter.deleteNotificationRequest(tenantId, notificationRequestId);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Autowired
|
||||
|
||||
@ -0,0 +1,116 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.notification.rule.cache;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import lombok.Data;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.dao.notification.NotificationRuleService;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class DefaultNotificationRulesCache implements NotificationRulesCache {
|
||||
|
||||
private final NotificationRuleService notificationRuleService;
|
||||
|
||||
@Value("${cache.notificationRules.maxSize:1000}")
|
||||
private int cacheMaxSize;
|
||||
@Value("${cache.notificationRules.timeToLiveInMinutes:30}")
|
||||
private int cacheValueTtl;
|
||||
private Cache<CacheKey, List<NotificationRule>> cache;
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
cache = Caffeine.newBuilder()
|
||||
.maximumSize(cacheMaxSize)
|
||||
.expireAfterAccess(cacheValueTtl, TimeUnit.MINUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
@EventListener(ComponentLifecycleMsg.class)
|
||||
public void onComponentLifecycleEvent(ComponentLifecycleMsg event) {
|
||||
switch (event.getEntityId().getEntityType()) {
|
||||
case NOTIFICATION_RULE:
|
||||
evict(event.getTenantId()); // TODO: evict by trigger type of the rule
|
||||
break;
|
||||
case TENANT:
|
||||
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||
lock.writeLock().lock(); // locking in case rules for tenant are fetched while evicting
|
||||
try {
|
||||
evict(event.getTenantId());
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NotificationRule> get(TenantId tenantId, NotificationRuleTriggerType triggerType) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
log.trace("Retrieving notification rules of type {} for tenant {} from cache", triggerType, tenantId);
|
||||
return cache.get(key(tenantId, triggerType), k -> {
|
||||
List<NotificationRule> rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType(tenantId, triggerType);
|
||||
log.trace("Fetched notification rules of type {} for tenant {} (count: {})", triggerType, tenantId, rules.size());
|
||||
return !rules.isEmpty() ? rules : Collections.emptyList();
|
||||
});
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void evict(TenantId tenantId) {
|
||||
cache.invalidateAll(Arrays.stream(NotificationRuleTriggerType.values())
|
||||
.map(triggerType -> key(tenantId, triggerType))
|
||||
.collect(Collectors.toList()));
|
||||
log.trace("Evicted all notification rules for tenant {} from cache", tenantId);
|
||||
}
|
||||
|
||||
private static CacheKey key(TenantId tenantId, NotificationRuleTriggerType triggerType) {
|
||||
return new CacheKey(tenantId, triggerType);
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class CacheKey {
|
||||
private final TenantId tenantId;
|
||||
private final NotificationRuleTriggerType triggerType;
|
||||
}
|
||||
|
||||
}
|
||||
@ -13,25 +13,16 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.notification.cache;
|
||||
package org.thingsboard.server.service.notification.rule.cache;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class NotificationRuleCacheValue implements Serializable {
|
||||
public interface NotificationRulesCache {
|
||||
|
||||
private static final long serialVersionUID = 9503216785105415L;
|
||||
|
||||
private List<NotificationRule> notificationRules;
|
||||
List<NotificationRule> get(TenantId tenantId, NotificationRuleTriggerType triggerType);
|
||||
|
||||
}
|
||||
@ -15,16 +15,15 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.notification.rule.trigger;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.notification.info.NewPlatformVersionNotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.info.RuleOriginatedNotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
|
||||
@ -36,17 +35,21 @@ public class NewPlatformVersionTriggerProcessor implements NotificationRuleTrigg
|
||||
|
||||
@Override
|
||||
public boolean matchesFilter(NewPlatformVersionTrigger trigger, NewPlatformVersionNotificationRuleTriggerConfig triggerConfig) {
|
||||
// todo: don't send repetitive notification after platform restart?
|
||||
if (!partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition()) {
|
||||
if (!partitionService.isMyPartition(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID)) {
|
||||
return false;
|
||||
}
|
||||
return trigger.getMessage().isUpdateAvailable();
|
||||
return trigger.getUpdateInfo().isUpdateAvailable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleOriginatedNotificationInfo constructNotificationInfo(NewPlatformVersionTrigger trigger) {
|
||||
UpdateMessage updateInfo = trigger.getUpdateInfo();
|
||||
return NewPlatformVersionNotificationInfo.builder()
|
||||
.message(JacksonUtil.convertValue(trigger.getMessage(), new TypeReference<>() {}))
|
||||
.latestVersion(updateInfo.getLatestVersion())
|
||||
.latestVersionReleaseNotesUrl(updateInfo.getLatestVersionReleaseNotesUrl())
|
||||
.upgradeInstructionsUrl(updateInfo.getUpgradeInstructionsUrl())
|
||||
.currentVersion(updateInfo.getCurrentVersion())
|
||||
.currentVersionReleaseNotesUrl(updateInfo.getCurrentVersionReleaseNotesUrl())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@ -414,7 +414,8 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
|| entityType.equals(EntityType.API_USAGE_STATE)
|
||||
|| (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED)
|
||||
|| entityType.equals(EntityType.ENTITY_VIEW)
|
||||
|| entityType.equals(EntityType.EDGE)) {
|
||||
|| entityType.equals(EntityType.EDGE)
|
||||
|| entityType.equals(EntityType.NOTIFICATION_RULE)) {
|
||||
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
|
||||
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
|
||||
for (String serviceId : tbCoreServices) {
|
||||
|
||||
@ -35,12 +35,12 @@ import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.rpc.RpcError;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
|
||||
@ -274,9 +274,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
|
||||
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
|
||||
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
|
||||
} else if (toCoreMsg.hasNotificationRuleProcessorMsg()) {
|
||||
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
|
||||
notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.warn("[{}] Failed to process message: {}", id, msg, e);
|
||||
@ -361,6 +358,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.hasToSubscriptionMgrMsg()) {
|
||||
forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
|
||||
} else if (toCoreNotification.hasNotificationRuleProcessorMsg()) {
|
||||
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreNotification
|
||||
.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
|
||||
notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
|
||||
callback.onSuccess();
|
||||
}
|
||||
if (statsEnabled) {
|
||||
stats.log(toCoreNotification);
|
||||
|
||||
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.update;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +27,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
import org.thingsboard.server.queue.util.AfterStartUp;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
@ -74,8 +74,8 @@ public class DefaultUpdateService implements UpdateService {
|
||||
private String version;
|
||||
private UUID instanceId = null;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
|
||||
public void init() {
|
||||
version = buildProperties != null ? buildProperties.getVersion() : "unknown";
|
||||
updateMessage = new UpdateMessage(false, version, "", "", "", "");
|
||||
if (updatesEnabled) {
|
||||
@ -132,7 +132,7 @@ public class DefaultUpdateService implements UpdateService {
|
||||
updateMessage = restClient.postForObject(UPDATE_SERVER_BASE_URL + "/api/v2/thingsboard/updates", request, UpdateMessage.class);
|
||||
if (updateMessage.isUpdateAvailable() && !updateMessage.equals(prevUpdateMessage)) {
|
||||
notificationRuleProcessor.process(NewPlatformVersionTrigger.builder()
|
||||
.message(updateMessage)
|
||||
.updateInfo(updateMessage)
|
||||
.build());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -437,9 +437,6 @@ cache:
|
||||
assetProfiles:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_ASSET_PROFILES_TTL:1440}"
|
||||
maxSize: "${CACHE_SPECS_ASSET_PROFILES_MAX_SIZE:10000}"
|
||||
notificationRules:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_RULES_TTL:1440}"
|
||||
maxSize: "${CACHE_SPECS_NOTIFICATION_RULES_MAX_SIZE:10000}"
|
||||
notificationSettings:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_SETTINGS_TTL:10}"
|
||||
maxSize: "${CACHE_SPECS_NOTIFICATION_SETTINGS_MAX_SIZE:1000}"
|
||||
@ -480,6 +477,10 @@ cache:
|
||||
entityCount:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_COUNT_TTL:1440}"
|
||||
maxSize: "${CACHE_SPECS_ENTITY_COUNT_MAX_SIZE:100000}"
|
||||
# deliberately placed outside 'specs' group above
|
||||
notificationRules:
|
||||
timeToLiveInMinutes: "${CACHE_SPECS_NOTIFICATION_RULES_TTL:30}"
|
||||
maxSize: "${CACHE_SPECS_NOTIFICATION_RULES_MAX_SIZE:1000}"
|
||||
|
||||
#Disable this because it is not required.
|
||||
spring.data.redis.repositories.enabled: false
|
||||
@ -960,7 +961,6 @@ edges:
|
||||
scheduler_pool_size: "${EDGES_SCHEDULER_POOL_SIZE:1}"
|
||||
send_scheduler_pool_size: "${EDGES_SEND_SCHEDULER_POOL_SIZE:1}"
|
||||
grpc_callback_thread_pool_size: "${EDGES_GRPC_CALLBACK_POOL_SIZE:1}"
|
||||
edge_events_ttl: "${EDGES_EDGE_EVENTS_TTL:0}"
|
||||
state:
|
||||
persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}"
|
||||
|
||||
|
||||
@ -104,13 +104,14 @@ public class EdgeImitator {
|
||||
ignoredTypes = new ArrayList<>();
|
||||
this.routingKey = routingKey;
|
||||
this.routingSecret = routingSecret;
|
||||
setEdgeCredentials("rpcHost", host);
|
||||
setEdgeCredentials("rpcPort", port);
|
||||
setEdgeCredentials("timeoutSecs", 3);
|
||||
setEdgeCredentials("keepAliveTimeSec", 300);
|
||||
updateEdgeClientFields("rpcHost", host);
|
||||
updateEdgeClientFields("rpcPort", port);
|
||||
updateEdgeClientFields("timeoutSecs", 3);
|
||||
updateEdgeClientFields("keepAliveTimeSec", 300);
|
||||
updateEdgeClientFields("maxInboundMessageSize", 4194304);
|
||||
}
|
||||
|
||||
private void setEdgeCredentials(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
|
||||
private void updateEdgeClientFields(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
|
||||
Field fieldToSet = edgeRpcClient.getClass().getDeclaredField(fieldName);
|
||||
fieldToSet.setAccessible(true);
|
||||
fieldToSet.set(edgeRpcClient, value);
|
||||
|
||||
@ -968,7 +968,6 @@ message ToCoreMsg {
|
||||
EdgeNotificationMsgProto edgeNotificationMsg = 5;
|
||||
DeviceActivityProto deviceActivityMsg = 6;
|
||||
NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7;
|
||||
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 8;
|
||||
}
|
||||
|
||||
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
|
||||
@ -983,6 +982,7 @@ message ToCoreNotificationMsg {
|
||||
bytes toEdgeSyncRequestMsg = 8;
|
||||
bytes fromEdgeSyncResponseMsg = 9;
|
||||
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10;
|
||||
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11;
|
||||
}
|
||||
|
||||
/* Messages that are handled by ThingsBoard RuleEngine Service */
|
||||
|
||||
@ -29,7 +29,6 @@ public class CacheConstants {
|
||||
public static final String TENANTS_CACHE = "tenants";
|
||||
public static final String TENANTS_EXIST_CACHE = "tenantsExist";
|
||||
public static final String DEVICE_PROFILE_CACHE = "deviceProfiles";
|
||||
public static final String NOTIFICATION_RULES_CACHE = "notificationRules";
|
||||
public static final String NOTIFICATION_SETTINGS_CACHE = "notificationSettings";
|
||||
|
||||
public static final String ASSET_PROFILE_CACHE = "assetProfiles";
|
||||
|
||||
@ -19,7 +19,6 @@ import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.thingsboard.server.common.data.UpdateMessage;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -31,11 +30,22 @@ import static org.thingsboard.server.common.data.util.CollectionsUtil.mapOf;
|
||||
@Builder
|
||||
public class NewPlatformVersionNotificationInfo implements RuleOriginatedNotificationInfo {
|
||||
|
||||
private Map<String, String> message;
|
||||
private String latestVersion;
|
||||
private String latestVersionReleaseNotesUrl;
|
||||
private String upgradeInstructionsUrl;
|
||||
|
||||
private String currentVersion;
|
||||
private String currentVersionReleaseNotesUrl;
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTemplateData() {
|
||||
return message;
|
||||
return mapOf(
|
||||
"latestVersion", latestVersion,
|
||||
"latestVersionReleaseNotesUrl", latestVersionReleaseNotesUrl,
|
||||
"upgradeInstructionsUrl", upgradeInstructionsUrl,
|
||||
"currentVersion", currentVersion,
|
||||
"currentVersionReleaseNotesUrl", currentVersionReleaseNotesUrl
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -20,8 +20,6 @@ import lombok.Data;
|
||||
@Data
|
||||
public class NewPlatformVersionNotificationRuleTriggerConfig implements NotificationRuleTriggerConfig {
|
||||
|
||||
// TODO: don't forget to create default notification configs
|
||||
|
||||
@Override
|
||||
public NotificationRuleTriggerType getTriggerType() {
|
||||
return NotificationRuleTriggerType.NEW_PLATFORM_VERSION;
|
||||
|
||||
@ -20,6 +20,7 @@ import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -62,6 +63,10 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
private boolean sslEnabled;
|
||||
@Value("${cloud.rpc.ssl.cert:}")
|
||||
private String certResource;
|
||||
@Value("${cloud.rpc.max_inbound_message_size:4194304}")
|
||||
private int maxInboundMessageSize;
|
||||
@Getter
|
||||
private int serverMaxInboundMessageSize;
|
||||
|
||||
private ManagedChannel channel;
|
||||
|
||||
@ -77,6 +82,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
Consumer<DownlinkMsg> onDownlink,
|
||||
Consumer<Exception> onError) {
|
||||
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(rpcHost, rpcPort)
|
||||
.maxInboundMessageSize(maxInboundMessageSize)
|
||||
.keepAliveTime(keepAliveTimeSec, TimeUnit.SECONDS);
|
||||
if (sslEnabled) {
|
||||
try {
|
||||
@ -101,7 +107,8 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
.setConnectRequestMsg(ConnectRequestMsg.newBuilder()
|
||||
.setEdgeRoutingKey(edgeKey)
|
||||
.setEdgeSecret(edgeSecret)
|
||||
.setEdgeVersion(EdgeVersion.V_3_3_3)
|
||||
.setEdgeVersion(EdgeVersion.V_3_4_0)
|
||||
.setMaxInboundMessageSize(maxInboundMessageSize)
|
||||
.build())
|
||||
.build());
|
||||
}
|
||||
@ -117,6 +124,10 @@ public class EdgeGrpcClient implements EdgeRpcClient {
|
||||
if (responseMsg.hasConnectResponseMsg()) {
|
||||
ConnectResponseMsg connectResponseMsg = responseMsg.getConnectResponseMsg();
|
||||
if (connectResponseMsg.getResponseCode().equals(ConnectResponseCode.ACCEPTED)) {
|
||||
if (connectResponseMsg.hasMaxInboundMessageSize()) {
|
||||
log.debug("[{}] Server max inbound message size: {}", edgeKey, connectResponseMsg.getMaxInboundMessageSize());
|
||||
serverMaxInboundMessageSize = connectResponseMsg.getMaxInboundMessageSize();
|
||||
}
|
||||
log.info("[{}] Configuration received: {}", edgeKey, connectResponseMsg.getConfiguration());
|
||||
onEdgeUpdate.accept(connectResponseMsg.getConfiguration());
|
||||
} else {
|
||||
|
||||
@ -41,4 +41,6 @@ public interface EdgeRpcClient {
|
||||
void sendUplinkMsg(UplinkMsg uplinkMsg);
|
||||
|
||||
void sendDownlinkResponseMsg(DownlinkResponseMsg downlinkResponseMsg);
|
||||
|
||||
int getServerMaxInboundMessageSize();
|
||||
}
|
||||
|
||||
@ -68,6 +68,7 @@ message ConnectRequestMsg {
|
||||
string edgeRoutingKey = 1;
|
||||
string edgeSecret = 2;
|
||||
EdgeVersion edgeVersion = 3;
|
||||
optional int32 maxInboundMessageSize = 4;
|
||||
}
|
||||
|
||||
enum ConnectResponseCode {
|
||||
@ -80,6 +81,7 @@ message ConnectResponseMsg {
|
||||
ConnectResponseCode responseCode = 1;
|
||||
string errorMsg = 2;
|
||||
EdgeConfiguration configuration = 3;
|
||||
optional int32 maxInboundMessageSize = 4;
|
||||
}
|
||||
|
||||
message SyncRequestMsg {
|
||||
|
||||
@ -26,7 +26,7 @@ import org.thingsboard.server.common.data.notification.rule.trigger.Notification
|
||||
@Builder
|
||||
public class NewPlatformVersionTrigger implements NotificationRuleTrigger {
|
||||
|
||||
private final UpdateMessage message;
|
||||
private final UpdateMessage updateInfo;
|
||||
|
||||
@Override
|
||||
public NotificationRuleTriggerType getType() {
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.queue.notification;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||
@ -25,6 +26,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
@ -32,24 +34,33 @@ import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
@ConditionalOnMissingBean(NotificationRuleProcessor.class)
|
||||
@ConditionalOnMissingBean(value = NotificationRuleProcessor.class, ignored = RemoteNotificationRuleProcessor.class)
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class RemoteNotificationRuleProcessor implements NotificationRuleProcessor {
|
||||
|
||||
private final TbQueueProducerProvider producerProvider;
|
||||
private final NotificationsTopicService notificationsTopicService;
|
||||
private final PartitionService partitionService;
|
||||
private final DataDecodingEncodingService encodingService;
|
||||
|
||||
@Override
|
||||
public void process(NotificationRuleTrigger trigger) {
|
||||
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
|
||||
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
|
||||
try {
|
||||
log.trace("Submitting notification rule trigger: {}", trigger);
|
||||
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
|
||||
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
|
||||
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, trigger.getTenantId(), trigger.getOriginatorEntityId());
|
||||
producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(),
|
||||
TransportProtos.ToCoreMsg.newBuilder()
|
||||
.setNotificationRuleProcessorMsg(msg)
|
||||
.build()), null);
|
||||
partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
|
||||
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(),
|
||||
TransportProtos.ToCoreNotificationMsg.newBuilder()
|
||||
.setNotificationRuleProcessorMsg(msg)
|
||||
.build()), null);
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to submit notification rule trigger: {}", trigger, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -598,7 +598,7 @@ public class DefaultCoapClientContext implements CoapClientContext {
|
||||
response.addMessageObserver(new TbCoapMessageObserver(requestId, id -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
|
||||
if (rpcRequestMsg != null) {
|
||||
transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(state.getSession(), rpcRequestMsg, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
}, id -> {
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequestMsg = transportContext.getRpcAwaitingAck().remove(id);
|
||||
|
||||
@ -65,7 +65,7 @@ public abstract class RpcDownlinkRequestCallbackProxy<R, T> implements DownlinkR
|
||||
|
||||
@Override
|
||||
public void onSuccess(R request, T response) {
|
||||
transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(client.getSession(), this.request, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
sendRpcReplyOnSuccess(response);
|
||||
if (callback != null) {
|
||||
callback.onSuccess(request, response);
|
||||
|
||||
@ -322,7 +322,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
|
||||
TransportProtos.ToDeviceRpcRequestMsg rpcRequest = rpcAwaitingAck.remove(msgId);
|
||||
if (rpcRequest != null) {
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, TransportServiceCallback.EMPTY);
|
||||
transportService.process(deviceSessionCtx.getSessionInfo(), rpcRequest, RpcStatus.DELIVERED, true, TransportServiceCallback.EMPTY);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
||||
@ -117,6 +117,8 @@ public interface TransportService {
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, boolean reportActivity, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback);
|
||||
|
||||
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
|
||||
|
||||
@ -647,6 +647,11 @@ public class DefaultTransportService implements TransportService {
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, TransportServiceCallback<Void> callback) {
|
||||
process(sessionInfo, msg, rpcStatus, false, callback);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcRequestMsg msg, RpcStatus rpcStatus, boolean reportActivity, TransportServiceCallback<Void> callback) {
|
||||
TransportProtos.ToDeviceRpcResponseStatusMsg responseMsg = TransportProtos.ToDeviceRpcResponseStatusMsg.newBuilder()
|
||||
.setRequestId(msg.getRequestId())
|
||||
.setRequestIdLSB(msg.getRequestIdLSB())
|
||||
@ -655,7 +660,9 @@ public class DefaultTransportService implements TransportService {
|
||||
.build();
|
||||
|
||||
if (checkLimits(sessionInfo, responseMsg, callback)) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
if (reportActivity) {
|
||||
reportActivityInternal(sessionInfo);
|
||||
}
|
||||
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setRpcResponseStatusMsg(responseMsg).build(),
|
||||
new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, TransportServiceCallback.EMPTY));
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@ package org.thingsboard.server.dao.notification;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.HasId;
|
||||
@ -28,43 +27,35 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRuleInfo
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
|
||||
import org.thingsboard.server.dao.entity.AbstractEntityService;
|
||||
import org.thingsboard.server.dao.entity.EntityDaoService;
|
||||
import org.thingsboard.server.dao.notification.cache.NotificationRuleCacheKey;
|
||||
import org.thingsboard.server.dao.notification.cache.NotificationRuleCacheValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DefaultNotificationRuleService extends AbstractCachedEntityService<NotificationRuleCacheKey, NotificationRuleCacheValue, NotificationRule> implements NotificationRuleService, EntityDaoService {
|
||||
public class DefaultNotificationRuleService extends AbstractEntityService implements NotificationRuleService, EntityDaoService {
|
||||
|
||||
private final NotificationRuleDao notificationRuleDao;
|
||||
|
||||
@Override
|
||||
public NotificationRule saveNotificationRule(TenantId tenantId, NotificationRule notificationRule) {
|
||||
boolean created = notificationRule.getId() == null;
|
||||
if (!created) {
|
||||
if (notificationRule.getId() != null) {
|
||||
NotificationRule oldNotificationRule = findNotificationRuleById(tenantId, notificationRule.getId());
|
||||
if (notificationRule.getTriggerType() != oldNotificationRule.getTriggerType()) {
|
||||
throw new IllegalArgumentException("Notification rule trigger type cannot be updated");
|
||||
}
|
||||
}
|
||||
try {
|
||||
notificationRule = notificationRuleDao.saveAndFlush(tenantId, notificationRule);
|
||||
publishEvictEvent(notificationRule);
|
||||
return notificationRuleDao.saveAndFlush(tenantId, notificationRule);
|
||||
} catch (Exception e) {
|
||||
handleEvictEvent(notificationRule);
|
||||
checkConstraintViolation(e, Map.of(
|
||||
"uq_notification_rule_name", "Notification rule with such name already exists"
|
||||
));
|
||||
throw e;
|
||||
}
|
||||
return notificationRule;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -89,44 +80,17 @@ public class DefaultNotificationRuleService extends AbstractCachedEntityService<
|
||||
|
||||
@Override
|
||||
public List<NotificationRule> findNotificationRulesByTenantIdAndTriggerType(TenantId tenantId, NotificationRuleTriggerType triggerType) {
|
||||
NotificationRuleCacheKey cacheKey = NotificationRuleCacheKey.builder()
|
||||
.tenantId(tenantId)
|
||||
.triggerType(triggerType)
|
||||
.build();
|
||||
return cache.getAndPutInTransaction(cacheKey, () -> NotificationRuleCacheValue.builder()
|
||||
.notificationRules(notificationRuleDao.findByTenantIdAndTriggerType(tenantId, triggerType))
|
||||
.build(), false)
|
||||
.getNotificationRules();
|
||||
return notificationRuleDao.findByTenantIdAndTriggerType(tenantId, triggerType);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void deleteNotificationRuleById(TenantId tenantId, NotificationRuleId id) {
|
||||
NotificationRule notificationRule = findNotificationRuleById(tenantId, id);
|
||||
publishEvictEvent(notificationRule);
|
||||
notificationRuleDao.removeById(tenantId, id.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteNotificationRulesByTenantId(TenantId tenantId) {
|
||||
notificationRuleDao.removeByTenantId(tenantId);
|
||||
|
||||
List<NotificationRuleCacheKey> cacheKeys = Arrays.stream(NotificationRuleTriggerType.values())
|
||||
.map(triggerType -> NotificationRuleCacheKey.builder()
|
||||
.tenantId(tenantId)
|
||||
.triggerType(triggerType)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
cache.evict(cacheKeys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleEvictEvent(NotificationRule notificationRule) {
|
||||
NotificationRuleCacheKey cacheKey = NotificationRuleCacheKey.builder()
|
||||
.tenantId(notificationRule.getTenantId())
|
||||
.triggerType(notificationRule.getTriggerType())
|
||||
.build();
|
||||
cache.evict(cacheKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -46,6 +46,7 @@ import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivi
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityNotificationRuleTriggerConfig.DeviceEvent;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.EntityActionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NewPlatformVersionNotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.RuleEngineComponentLifecycleEventNotificationRuleTriggerConfig;
|
||||
@ -144,6 +145,14 @@ public class DefaultNotificationSettingsService implements NotificationSettingsS
|
||||
apiUsageLimitRuleTriggerConfig.setNotifyOn(Set.of(ApiUsageStateValue.WARNING, ApiUsageStateValue.DISABLED));
|
||||
createRule(tenantId, "API usage limit", apiUsageLimitNotificationTemplate.getId(), apiUsageLimitRuleTriggerConfig,
|
||||
List.of(affectedTenantAdmins.getId(), sysAdmins.getId()), "Send notification to tenant admins and system admins when API feature usage state changed");
|
||||
|
||||
NotificationTemplate newPlatformVersionNotificationTemplate = createTemplate(tenantId, "New platform version notification", NotificationType.NEW_PLATFORM_VERSION,
|
||||
"New version <b>${latestVersion}</b> is available",
|
||||
"Current platform version is ${currentVersion}",
|
||||
null, "Open release notes", "${latestVersionReleaseNotesUrl}");
|
||||
NewPlatformVersionNotificationRuleTriggerConfig newPlatformVersionRuleTriggerConfig = new NewPlatformVersionNotificationRuleTriggerConfig();
|
||||
createRule(tenantId, "New platform version", newPlatformVersionNotificationTemplate.getId(), newPlatformVersionRuleTriggerConfig,
|
||||
List.of(sysAdmins.getId(), tenantAdmins.getId()), "Send notification to system admins and tenant admins when new platform version is available");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -48,6 +48,12 @@ public class DefaultNotificationTemplateService extends AbstractEntityService im
|
||||
|
||||
@Override
|
||||
public NotificationTemplate saveNotificationTemplate(TenantId tenantId, NotificationTemplate notificationTemplate) {
|
||||
if (notificationTemplate.getId() != null) {
|
||||
NotificationTemplate oldNotificationTemplate = findNotificationTemplateById(tenantId, notificationTemplate.getId());
|
||||
if (notificationTemplate.getNotificationType() != oldNotificationTemplate.getNotificationType()) {
|
||||
throw new IllegalArgumentException("Notification type cannot be updated");
|
||||
}
|
||||
}
|
||||
try {
|
||||
return notificationTemplateDao.saveAndFlush(tenantId, notificationTemplate);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -1,43 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.notification.cache;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class NotificationRuleCacheKey implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 5987113265482170L;
|
||||
|
||||
private TenantId tenantId;
|
||||
private NotificationRuleTriggerType triggerType;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return tenantId + "_" + triggerType;
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,32 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.notification.cache;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.cache.CaffeineTbTransactionalCache;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
|
||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true)
|
||||
@Service
|
||||
public class NotificationRuleCaffeineCache extends CaffeineTbTransactionalCache<NotificationRuleCacheKey, NotificationRuleCacheValue> {
|
||||
|
||||
public NotificationRuleCaffeineCache(CacheManager cacheManager) {
|
||||
super(cacheManager, CacheConstants.NOTIFICATION_RULES_CACHE);
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,35 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.notification.cache;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.cache.CacheSpecsMap;
|
||||
import org.thingsboard.server.cache.RedisTbTransactionalCache;
|
||||
import org.thingsboard.server.cache.TBRedisCacheConfiguration;
|
||||
import org.thingsboard.server.cache.TbFSTRedisSerializer;
|
||||
import org.thingsboard.server.common.data.CacheConstants;
|
||||
|
||||
@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
|
||||
@Service
|
||||
public class NotificationRuleRedisCache extends RedisTbTransactionalCache<NotificationRuleCacheKey, NotificationRuleCacheValue> {
|
||||
|
||||
public NotificationRuleRedisCache(CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory, TBRedisCacheConfiguration configuration) {
|
||||
super(CacheConstants.NOTIFICATION_RULES_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbFSTRedisSerializer<>());
|
||||
}
|
||||
|
||||
}
|
||||
@ -804,7 +804,7 @@ CREATE TABLE IF NOT EXISTS notification_template (
|
||||
tenant_id UUID NOT NULL,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
notification_type VARCHAR(50) NOT NULL,
|
||||
configuration VARCHAR(10000) NOT NULL,
|
||||
configuration VARCHAR(10000000) NOT NULL,
|
||||
CONSTRAINT uq_notification_template_name UNIQUE (tenant_id, name)
|
||||
);
|
||||
|
||||
@ -827,7 +827,7 @@ CREATE TABLE IF NOT EXISTS notification_request (
|
||||
tenant_id UUID NOT NULL,
|
||||
targets VARCHAR(10000) NOT NULL,
|
||||
template_id UUID,
|
||||
template VARCHAR(10000),
|
||||
template VARCHAR(10000000),
|
||||
info VARCHAR(1000),
|
||||
additional_config VARCHAR(1000),
|
||||
originator_entity_id UUID,
|
||||
|
||||
@ -211,7 +211,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
@Test
|
||||
public void testFindAssetsByTenantId() {
|
||||
List<Asset> assets = new ArrayList<>();
|
||||
for (int i=0;i<178;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
asset.setName("Asset"+i);
|
||||
@ -220,7 +220,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<Asset> loadedAssets = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(23);
|
||||
PageLink pageLink = new PageLink(3);
|
||||
PageData<Asset> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetsByTenantId(tenantId, pageLink);
|
||||
@ -237,7 +237,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
|
||||
assetService.deleteAssetsByTenantId(tenantId);
|
||||
|
||||
pageLink = new PageLink(33);
|
||||
pageLink = new PageLink(4);
|
||||
pageData = assetService.findAssetsByTenantId(tenantId, pageLink);
|
||||
Assert.assertFalse(pageData.hasNext());
|
||||
Assert.assertTrue(pageData.getData().isEmpty());
|
||||
@ -247,7 +247,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
public void testFindAssetsByTenantIdAndName() {
|
||||
String title1 = "Asset title 1";
|
||||
List<AssetInfo> assetsTitle1 = new ArrayList<>();
|
||||
for (int i=0;i<143;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -259,7 +259,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
String title2 = "Asset title 2";
|
||||
List<AssetInfo> assetsTitle2 = new ArrayList<>();
|
||||
for (int i=0;i<175;i++) {
|
||||
for (int i=0;i<17;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -271,7 +271,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<AssetInfo> loadedAssetsTitle1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15, 0, title1);
|
||||
PageLink pageLink = new PageLink(3, 0, title1);
|
||||
PageData<AssetInfo> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetInfosByTenantId(tenantId, pageLink);
|
||||
@ -325,7 +325,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
String title1 = "Asset title 1";
|
||||
String type1 = "typeA";
|
||||
List<Asset> assetsType1 = new ArrayList<>();
|
||||
for (int i=0;i<143;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -338,7 +338,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
String title2 = "Asset title 2";
|
||||
String type2 = "typeB";
|
||||
List<Asset> assetsType2 = new ArrayList<>();
|
||||
for (int i=0;i<175;i++) {
|
||||
for (int i=0;i<17;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -350,7 +350,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<Asset> loadedAssetsType1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15);
|
||||
PageLink pageLink = new PageLink(3);
|
||||
PageData<Asset> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetsByTenantIdAndType(tenantId, type1, pageLink);
|
||||
@ -408,7 +408,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
CustomerId customerId = customer.getId();
|
||||
|
||||
List<AssetInfo> assets = new ArrayList<>();
|
||||
for (int i=0;i<278;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
asset.setName("Asset"+i);
|
||||
@ -418,7 +418,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<AssetInfo> loadedAssets = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(23);
|
||||
PageLink pageLink = new PageLink(3);
|
||||
PageData<AssetInfo> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetInfosByTenantIdAndCustomerId(tenantId, customerId, pageLink);
|
||||
@ -435,7 +435,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
|
||||
assetService.unassignCustomerAssets(tenantId, customerId);
|
||||
|
||||
pageLink = new PageLink(33);
|
||||
pageLink = new PageLink(4);
|
||||
pageData = assetService.findAssetInfosByTenantIdAndCustomerId(tenantId, customerId, pageLink);
|
||||
Assert.assertFalse(pageData.hasNext());
|
||||
Assert.assertTrue(pageData.getData().isEmpty());
|
||||
@ -452,7 +452,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
|
||||
String title1 = "Asset title 1";
|
||||
List<Asset> assetsTitle1 = new ArrayList<>();
|
||||
for (int i=0;i<175;i++) {
|
||||
for (int i=0;i<17;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -465,7 +465,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
String title2 = "Asset title 2";
|
||||
List<Asset> assetsTitle2 = new ArrayList<>();
|
||||
for (int i=0;i<143;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -478,7 +478,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<Asset> loadedAssetsTitle1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15, 0, title1);
|
||||
PageLink pageLink = new PageLink(3, 0, title1);
|
||||
PageData<Asset> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetsByTenantIdAndCustomerId(tenantId, customerId, pageLink);
|
||||
@ -540,7 +540,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
String title1 = "Asset title 1";
|
||||
String type1 = "typeC";
|
||||
List<Asset> assetsType1 = new ArrayList<>();
|
||||
for (int i=0;i<175;i++) {
|
||||
for (int i=0;i<17;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -554,7 +554,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
String title2 = "Asset title 2";
|
||||
String type2 = "typeD";
|
||||
List<Asset> assetsType2 = new ArrayList<>();
|
||||
for (int i=0;i<143;i++) {
|
||||
for (int i=0;i<13;i++) {
|
||||
Asset asset = new Asset();
|
||||
asset.setTenantId(tenantId);
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
@ -567,7 +567,7 @@ public abstract class BaseAssetServiceTest extends AbstractServiceTest {
|
||||
}
|
||||
|
||||
List<Asset> loadedAssetsType1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15);
|
||||
PageLink pageLink = new PageLink(3);
|
||||
PageData<Asset> pageData = null;
|
||||
do {
|
||||
pageData = assetService.findAssetsByTenantIdAndCustomerIdAndType(tenantId, customerId, type1, pageLink);
|
||||
|
||||
@ -32,6 +32,7 @@ kafka:
|
||||
linger_ms: "TB_KAFKA_LINGER_MS" # for producer
|
||||
partitions_consumed_concurrently: "TB_KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
|
||||
requestTimeout: "TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS"
|
||||
connectionTimeout: "TB_KAFKA_CONNECTION_TIMEOUT_MS"
|
||||
compression: "TB_QUEUE_KAFKA_COMPRESSION" # gzip or uncompressed
|
||||
topic_properties: "TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES"
|
||||
use_confluent_cloud: "TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD"
|
||||
|
||||
@ -32,6 +32,7 @@ kafka:
|
||||
linger_ms: "5" # for producer
|
||||
partitions_consumed_concurrently: "1" # (EXPERIMENTAL) increase this value if you are planning to handle more than one partition (scale up, scale down) - this will decrease the latency
|
||||
requestTimeout: "30000" # The default value in kafkajs is: 30000
|
||||
connectionTimeout: "1000" # The default value in kafkajs is: 1000
|
||||
compression: "gzip" # gzip or uncompressed
|
||||
topic_properties: "retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1"
|
||||
use_confluent_cloud: false
|
||||
|
||||
@ -42,6 +42,7 @@ export class KafkaTemplate implements IQueue {
|
||||
private maxBatchSize = Number(config.get('kafka.batch_size'));
|
||||
private linger = Number(config.get('kafka.linger_ms'));
|
||||
private requestTimeout = Number(config.get('kafka.requestTimeout'));
|
||||
private connectionTimeout = Number(config.get('kafka.connectionTimeout'));
|
||||
private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None;
|
||||
private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently'));
|
||||
|
||||
@ -80,6 +81,8 @@ export class KafkaTemplate implements IQueue {
|
||||
|
||||
kafkaConfig['requestTimeout'] = this.requestTimeout;
|
||||
|
||||
kafkaConfig['connectionTimeout'] = this.connectionTimeout;
|
||||
|
||||
if (useConfluent) {
|
||||
kafkaConfig['sasl'] = {
|
||||
mechanism: config.get('kafka.confluent.sasl.mechanism') as any,
|
||||
|
||||
@ -15,9 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.mqtt.integration;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.handler.codec.mqtt.MqttMessageType;
|
||||
@ -44,7 +42,7 @@ public class MqttIntegrationTest {
|
||||
|
||||
static final String MQTT_HOST = "localhost";
|
||||
static final int KEEPALIVE_TIMEOUT_SECONDS = 2;
|
||||
static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);
|
||||
static final long RECONNECT_DELAY_SECONDS = 10L;
|
||||
|
||||
EventLoopGroup eventLoopGroup;
|
||||
MqttServer mqttServer;
|
||||
@ -68,7 +66,7 @@ public class MqttIntegrationTest {
|
||||
this.mqttServer.shutdown();
|
||||
}
|
||||
if (this.eventLoopGroup != null) {
|
||||
this.eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
|
||||
this.eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,6 +78,7 @@ public class MqttIntegrationTest {
|
||||
log.warn("Sending publish messages...");
|
||||
CountDownLatch latch = new CountDownLatch(3);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Thread.sleep(30);
|
||||
Future<Void> pubFuture = publishMsg();
|
||||
pubFuture.addListener(future -> latch.countDown());
|
||||
}
|
||||
@ -87,35 +86,30 @@ public class MqttIntegrationTest {
|
||||
log.warn("Waiting for messages acknowledgments...");
|
||||
boolean awaitResult = latch.await(10, TimeUnit.SECONDS);
|
||||
Assert.assertTrue(awaitResult);
|
||||
log.warn("Messages are delivered successfully...");
|
||||
|
||||
//when
|
||||
CountDownLatch keepAliveLatch = new CountDownLatch(1);
|
||||
|
||||
log.warn("Starting idle period...");
|
||||
boolean keepaliveAwaitResult = keepAliveLatch.await(5, TimeUnit.SECONDS);
|
||||
Assert.assertFalse(keepaliveAwaitResult);
|
||||
Thread.sleep(5000);
|
||||
|
||||
//then
|
||||
List<MqttMessageType> allReceivedEvents = this.mqttServer.getEventsFromClient();
|
||||
long pubCount = allReceivedEvents.stream().filter(mqttMessageType -> mqttMessageType == MqttMessageType.PUBLISH).count();
|
||||
long disconnectCount = allReceivedEvents.stream().filter(type -> type == MqttMessageType.DISCONNECT).count();
|
||||
|
||||
Assert.assertEquals(3, pubCount);
|
||||
Assert.assertEquals(1, disconnectCount);
|
||||
}
|
||||
|
||||
private Future<Void> publishMsg() {
|
||||
ByteBuf byteBuf = ALLOCATOR.buffer();
|
||||
byteBuf.writeBytes("payload".getBytes(StandardCharsets.UTF_8));
|
||||
return this.mqttClient.publish(
|
||||
"test/topic",
|
||||
byteBuf,
|
||||
MqttQoS.AT_LEAST_ONCE);
|
||||
Unpooled.wrappedBuffer("payload".getBytes(StandardCharsets.UTF_8)),
|
||||
MqttQoS.AT_MOST_ONCE);
|
||||
}
|
||||
|
||||
private MqttClient initClient() throws Exception {
|
||||
MqttClientConfig config = new MqttClientConfig();
|
||||
config.setTimeoutSeconds(KEEPALIVE_TIMEOUT_SECONDS);
|
||||
config.setReconnectDelay(RECONNECT_DELAY_SECONDS);
|
||||
MqttClient client = MqttClient.create(config, null);
|
||||
client.setEventLoop(this.eventLoopGroup);
|
||||
Future<MqttConnectResult> connectFuture = client.connect(MQTT_HOST, this.mqttServer.getMqttPort());
|
||||
|
||||
@ -35,7 +35,6 @@ import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
|
||||
import org.springframework.http.client.Netty4ClientHttpRequestFactory;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
import org.springframework.web.client.AsyncRestTemplate;
|
||||
@ -49,6 +48,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.credentials.BasicCredentials;
|
||||
import org.thingsboard.rule.engine.credentials.ClientCredentials;
|
||||
import org.thingsboard.rule.engine.credentials.CredentialsType;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
@ -192,7 +192,7 @@ public class TbHttpClient {
|
||||
config.isIgnoreRequestBody()) {
|
||||
entity = new HttpEntity<>(headers);
|
||||
} else {
|
||||
entity = new HttpEntity<>(msg.getData(), headers);
|
||||
entity = new HttpEntity<>(getData(msg), headers);
|
||||
}
|
||||
|
||||
URI uri = buildEncodedUri(endpointUrl);
|
||||
@ -243,6 +243,18 @@ public class TbHttpClient {
|
||||
return uri;
|
||||
}
|
||||
|
||||
private String getData(TbMsg msg) {
|
||||
String data = msg.getData();
|
||||
|
||||
if (config.isTrimDoubleQuotes()) {
|
||||
final String dataBefore = data;
|
||||
data = data.replaceAll("^\"|\"$", "");;
|
||||
log.trace("Trimming double quotes. Before trim: [{}], after trim: [{}]", dataBefore, data);
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
private TbMsg processResponse(TbContext ctx, TbMsg origMsg, ResponseEntity<String> response) {
|
||||
TbMsgMetaData metaData = origMsg.getMetaData();
|
||||
metaData.putValue(STATUS, response.getStatusCode().name());
|
||||
|
||||
@ -37,8 +37,7 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
private int readTimeoutMs;
|
||||
private int maxParallelRequestsCount;
|
||||
private boolean useRedisQueueForMsgPersistence;
|
||||
private boolean trimQueue;
|
||||
private int maxQueueSize;
|
||||
private boolean trimDoubleQuotes;
|
||||
private boolean enableProxy;
|
||||
private boolean useSystemProxyProperties;
|
||||
private String proxyHost;
|
||||
@ -59,7 +58,7 @@ public class TbRestApiCallNodeConfiguration implements NodeConfiguration<TbRestA
|
||||
configuration.setReadTimeoutMs(0);
|
||||
configuration.setMaxParallelRequestsCount(0);
|
||||
configuration.setUseRedisQueueForMsgPersistence(false);
|
||||
configuration.setTrimQueue(false);
|
||||
configuration.setTrimDoubleQuotes(false);
|
||||
configuration.setEnableProxy(false);
|
||||
configuration.setCredentials(new AnonymousCredentials());
|
||||
configuration.setIgnoreRequestBody(false);
|
||||
|
||||
@ -478,6 +478,20 @@
|
||||
</section>
|
||||
</form>
|
||||
</mat-step>
|
||||
|
||||
<mat-step *ngIf="ruleNotificationForm.get('triggerType').value === triggerType.NEW_PLATFORM_VERSION"
|
||||
[stepControl]="newPlatformVersionTemplateForm">
|
||||
<ng-template matStepLabel>{{ 'notification.new-platform-version-trigger-settings' | translate }}</ng-template>
|
||||
<form [formGroup]="ruleNotificationForm">
|
||||
<section formGroupName="additionalConfig">
|
||||
<mat-form-field class="mat-block">
|
||||
<mat-label translate>notification.description</mat-label>
|
||||
<input matInput formControlName="description">
|
||||
</mat-form-field>
|
||||
</section>
|
||||
</form>
|
||||
</mat-step>
|
||||
|
||||
</mat-horizontal-stepper>
|
||||
</div>
|
||||
<mat-divider></mat-divider>
|
||||
|
||||
@ -94,6 +94,7 @@ export class RuleNotificationDialogComponent extends
|
||||
ruleEngineEventsTemplateForm: FormGroup;
|
||||
entitiesLimitTemplateForm: FormGroup;
|
||||
apiUsageLimitTemplateForm: FormGroup;
|
||||
newPlatformVersionTemplateForm: FormGroup;
|
||||
|
||||
triggerType = TriggerType;
|
||||
triggerTypes: TriggerType[];
|
||||
@ -294,6 +295,12 @@ export class RuleNotificationDialogComponent extends
|
||||
})
|
||||
});
|
||||
|
||||
this.newPlatformVersionTemplateForm = this.fb.group({
|
||||
triggerConfig: this.fb.group({
|
||||
|
||||
})
|
||||
});
|
||||
|
||||
this.triggerTypeFormsMap = new Map<TriggerType, FormGroup>([
|
||||
[TriggerType.ALARM, this.alarmTemplateForm],
|
||||
[TriggerType.ALARM_COMMENT, this.alarmCommentTemplateForm],
|
||||
@ -302,7 +309,8 @@ export class RuleNotificationDialogComponent extends
|
||||
[TriggerType.ALARM_ASSIGNMENT, this.alarmAssignmentTemplateForm],
|
||||
[TriggerType.RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT, this.ruleEngineEventsTemplateForm],
|
||||
[TriggerType.ENTITIES_LIMIT, this.entitiesLimitTemplateForm],
|
||||
[TriggerType.API_USAGE_LIMIT, this.apiUsageLimitTemplateForm]
|
||||
[TriggerType.API_USAGE_LIMIT, this.apiUsageLimitTemplateForm],
|
||||
[TriggerType.NEW_PLATFORM_VERSION, this.newPlatformVersionTemplateForm]
|
||||
]);
|
||||
|
||||
if (data.isAdd || data.isCopy) {
|
||||
@ -434,10 +442,16 @@ export class RuleNotificationDialogComponent extends
|
||||
}
|
||||
|
||||
private allowTriggerTypes(): TriggerType[] {
|
||||
const sysAdminAllowTriggerTypes = new Set([
|
||||
TriggerType.ENTITIES_LIMIT,
|
||||
TriggerType.API_USAGE_LIMIT,
|
||||
TriggerType.NEW_PLATFORM_VERSION,
|
||||
]);
|
||||
|
||||
if (this.isSysAdmin()) {
|
||||
return [TriggerType.ENTITIES_LIMIT, TriggerType.API_USAGE_LIMIT];
|
||||
return Array.from(sysAdminAllowTriggerTypes);
|
||||
}
|
||||
return Object.values(TriggerType).filter(type => type !== TriggerType.ENTITIES_LIMIT && type !== TriggerType.API_USAGE_LIMIT);
|
||||
return Object.values(TriggerType).filter(type => !sysAdminAllowTriggerTypes.has(type));
|
||||
}
|
||||
|
||||
get allowEntityTypeForEntityAction(): EntityType[] {
|
||||
|
||||
@ -177,10 +177,15 @@ export class TemplateNotificationDialogComponent
|
||||
}
|
||||
|
||||
private allowNotificationType(): NotificationType[] {
|
||||
const sysAdminAllowNotificationTypes = new Set([
|
||||
NotificationType.ENTITIES_LIMIT,
|
||||
NotificationType.API_USAGE_LIMIT,
|
||||
NotificationType.NEW_PLATFORM_VERSION,
|
||||
]);
|
||||
|
||||
if (this.isSysAdmin()) {
|
||||
return [NotificationType.GENERAL, NotificationType.ENTITIES_LIMIT, NotificationType.API_USAGE_LIMIT];
|
||||
return [NotificationType.GENERAL, ...sysAdminAllowNotificationTypes];
|
||||
}
|
||||
return Object.values(NotificationType)
|
||||
.filter(type => type !== NotificationType.ENTITIES_LIMIT && type !== NotificationType.API_USAGE_LIMIT);
|
||||
return Object.values(NotificationType).filter(type => !sysAdminAllowNotificationTypes.has(type));
|
||||
}
|
||||
}
|
||||
|
||||
@ -442,6 +442,7 @@ export enum NotificationType {
|
||||
RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT = 'RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT',
|
||||
ENTITIES_LIMIT = 'ENTITIES_LIMIT',
|
||||
API_USAGE_LIMIT = 'API_USAGE_LIMIT',
|
||||
NEW_PLATFORM_VERSION = 'NEW_PLATFORM_VERSION',
|
||||
RULE_ENGINE = 'RULE_ENGINE'
|
||||
}
|
||||
|
||||
@ -536,6 +537,12 @@ export const NotificationTemplateTypeTranslateMap = new Map<NotificationType, No
|
||||
helpId: 'notification/api_usage_limit'
|
||||
}
|
||||
],
|
||||
[NotificationType.NEW_PLATFORM_VERSION,
|
||||
{
|
||||
name: 'notification.template-type.new-platform-version',
|
||||
helpId: 'notification/new_platform_version'
|
||||
}
|
||||
],
|
||||
[NotificationType.RULE_ENGINE,
|
||||
{
|
||||
name: 'notification.template-type.rule-engine',
|
||||
@ -552,7 +559,8 @@ export enum TriggerType {
|
||||
ALARM_ASSIGNMENT = 'ALARM_ASSIGNMENT',
|
||||
RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT = 'RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT',
|
||||
ENTITIES_LIMIT = 'ENTITIES_LIMIT',
|
||||
API_USAGE_LIMIT = 'API_USAGE_LIMIT'
|
||||
API_USAGE_LIMIT = 'API_USAGE_LIMIT',
|
||||
NEW_PLATFORM_VERSION = 'NEW_PLATFORM_VERSION'
|
||||
}
|
||||
|
||||
export const TriggerTypeTranslationMap = new Map<TriggerType, string>([
|
||||
@ -564,4 +572,5 @@ export const TriggerTypeTranslationMap = new Map<TriggerType, string>([
|
||||
[TriggerType.RULE_ENGINE_COMPONENT_LIFECYCLE_EVENT, 'notification.trigger.rule-engine-lifecycle-event'],
|
||||
[TriggerType.ENTITIES_LIMIT, 'notification.trigger.entities-limit'],
|
||||
[TriggerType.API_USAGE_LIMIT, 'notification.trigger.api-usage-limit'],
|
||||
[TriggerType.NEW_PLATFORM_VERSION, 'notification.trigger.new-platform-version'],
|
||||
]);
|
||||
|
||||
@ -0,0 +1,46 @@
|
||||
#### New platform version notification templatization
|
||||
|
||||
<div class="divider"></div>
|
||||
<br/>
|
||||
|
||||
Notification subject and message fields support templatization. The list of available templatization parameters depends on the template type.
|
||||
See the available types and parameters below:
|
||||
|
||||
Available template parameters:
|
||||
|
||||
* *recipientEmail* - email of the recipient;
|
||||
* *recipientFirstName* - first name of the recipient;
|
||||
* *recipientLastName* - last name of the recipient;
|
||||
* *latestVersion* - the latest platform version available;
|
||||
* *latestVersionReleaseNotesUrl* - release notes link for latest version;
|
||||
* *upgradeInstructionsUrl* - upgrade instructions link for latest version;
|
||||
* *currentVersion* - the current platform version
|
||||
* *currentVersionReleaseNotesUrl* - release notes link for current version;
|
||||
|
||||
Parameter names must be wrapped using `${...}`. For example: `${recipientFirstName}`.
|
||||
You may also modify the value of the parameter with one of the suffixes:
|
||||
|
||||
* `upperCase`, for example - `${recipientFirstName:upperCase}`
|
||||
* `lowerCase`, for example - `${recipientFirstName:lowerCase}`
|
||||
* `capitalize`, for example - `${recipientFirstName:capitalize}`
|
||||
|
||||
<div class="divider"></div>
|
||||
|
||||
##### Examples
|
||||
|
||||
Let's assume that new 3.5.0 version is released but currently deployed version is 3.4.4. The following template:
|
||||
|
||||
```text
|
||||
New version ${latestVersion} is available. Current version is ${currentVersion}
|
||||
{:copy-code}
|
||||
```
|
||||
|
||||
will be transformed to:
|
||||
|
||||
```text
|
||||
New version 3.5.0 is available. Current version is 3.4.4
|
||||
{:copy-code}
|
||||
```
|
||||
|
||||
<br>
|
||||
<br>
|
||||
@ -2761,6 +2761,7 @@
|
||||
"all": "All",
|
||||
"api-feature-hint": "If the field is empty, the trigger will be applied to all api features",
|
||||
"api-usage-trigger-settings": "API usage trigger settings",
|
||||
"new-platform-version-trigger-settings": "New platform version trigger settings",
|
||||
"at-least-one-should-be-selected": "At least one should be selected",
|
||||
"basic-settings": "Basic settings",
|
||||
"button-text": "Button text",
|
||||
@ -2944,7 +2945,8 @@
|
||||
"entity-action": "Entity action",
|
||||
"general": "General",
|
||||
"rule-engine-lifecycle-event": "Rule engine lifecycle event",
|
||||
"rule-engine": "Rule engine"
|
||||
"rule-engine": "Rule engine",
|
||||
"new-platform-version": "New platform version"
|
||||
},
|
||||
"templates": "Templates",
|
||||
"tenant-profiles-list-rule-hint": "If the field is empty, the trigger will be applied to all tenant profiles",
|
||||
@ -2961,6 +2963,7 @@
|
||||
"entities-limit": "Entities limit",
|
||||
"entity-action": "Entity action",
|
||||
"rule-engine-lifecycle-event": "Rule engine lifecycle event",
|
||||
"new-platform-version": "New platform version",
|
||||
"trigger": "Trigger",
|
||||
"trigger-required": "Trigger is required"
|
||||
},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user