Notification request stats
This commit is contained in:
		
							parent
							
								
									aa89d0e936
								
							
						
					
					
						commit
						4b39d2a221
					
				@ -56,7 +56,8 @@ CREATE TABLE IF NOT EXISTS notification_request (
 | 
			
		||||
    originator_entity_id UUID,
 | 
			
		||||
    originator_entity_type VARCHAR(32),
 | 
			
		||||
    rule_id UUID NULL CONSTRAINT fk_notification_request_rule_id REFERENCES notification_rule(id),
 | 
			
		||||
    status VARCHAR(32)
 | 
			
		||||
    status VARCHAR(32),
 | 
			
		||||
    stats VARCHAR(1000)
 | 
			
		||||
);
 | 
			
		||||
CREATE INDEX IF NOT EXISTS idx_notification_request_tenant_id_originator_type_created_time ON notification_request(tenant_id, originator_type, created_time DESC);
 | 
			
		||||
 | 
			
		||||
@ -78,3 +79,16 @@ CREATE INDEX IF NOT EXISTS idx_notification_notification_request_id ON notificat
 | 
			
		||||
ALTER TABLE alarm ADD COLUMN IF NOT EXISTS notification_rule_id UUID;
 | 
			
		||||
 | 
			
		||||
ALTER TABLE tb_user ADD COLUMN IF NOT EXISTS phone VARCHAR(255);
 | 
			
		||||
 | 
			
		||||
CREATE OR REPLACE FUNCTION on_notification_deleted() RETURNS TRIGGER as $notification_deleted$
 | 
			
		||||
BEGIN
 | 
			
		||||
    RAISE NOTICE 'ABAAAAAAAAAAAAAAAAAAA';
 | 
			
		||||
    INSERT INTO id_and_time values ('13814000-1dd2-11b2-8080-808080808080', 0);
 | 
			
		||||
    RETURN NULL;
 | 
			
		||||
END;
 | 
			
		||||
$notification_deleted$ LANGUAGE plpgsql;
 | 
			
		||||
 | 
			
		||||
CREATE TRIGGER notification_deleted_trigger
 | 
			
		||||
    AFTER DELETE ON id_and_time
 | 
			
		||||
    REFERENCING OLD TABLE AS deleted
 | 
			
		||||
    FOR EACH STATEMENT EXECUTE FUNCTION on_notification_deleted();
 | 
			
		||||
 | 
			
		||||
@ -37,7 +37,6 @@ import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.Notification;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationRequestService;
 | 
			
		||||
@ -102,6 +101,7 @@ public class NotificationController extends BaseController {
 | 
			
		||||
        }
 | 
			
		||||
        notificationRequest.setRuleId(null);
 | 
			
		||||
        notificationRequest.setStatus(null);
 | 
			
		||||
        notificationRequest.setStats(null);
 | 
			
		||||
 | 
			
		||||
        return doSaveAndLog(EntityType.NOTIFICATION_REQUEST, notificationRequest, notificationManager::processNotificationRequest);
 | 
			
		||||
    }
 | 
			
		||||
@ -113,13 +113,6 @@ public class NotificationController extends BaseController {
 | 
			
		||||
        return checkEntityId(notificationRequestId, notificationRequestService::findNotificationRequestById, Operation.READ);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @GetMapping("/notification/request/info/{id}")
 | 
			
		||||
    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
 | 
			
		||||
    public NotificationRequestInfo getNotificationRequestInfoById(@PathVariable UUID id) throws ThingsboardException {
 | 
			
		||||
        NotificationRequestId notificationRequestId = new NotificationRequestId(id);
 | 
			
		||||
        return checkEntityId(notificationRequestId, notificationRequestService::getNotificationRequestInfoById, Operation.READ);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @GetMapping("/notification/requests")
 | 
			
		||||
    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
 | 
			
		||||
    public PageData<NotificationRequest> getNotificationRequests(@RequestParam int pageSize,
 | 
			
		||||
@ -133,8 +126,8 @@ public class NotificationController extends BaseController {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @DeleteMapping("/notification/request/{id}")
 | 
			
		||||
    public void deleteNotificationRequest(@PathVariable UUID id,
 | 
			
		||||
                                          @AuthenticationPrincipal SecurityUser user) throws Exception {
 | 
			
		||||
    @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
 | 
			
		||||
    public void deleteNotificationRequest(@PathVariable UUID id) throws Exception {
 | 
			
		||||
        NotificationRequestId notificationRequestId = new NotificationRequestId(id);
 | 
			
		||||
        NotificationRequest notificationRequest = checkEntityId(notificationRequestId, notificationRequestService::findNotificationRequestById, Operation.DELETE);
 | 
			
		||||
        doDeleteAndLog(EntityType.NOTIFICATION_REQUEST, notificationRequest, notificationManager::deleteNotificationRequest);
 | 
			
		||||
 | 
			
		||||
@ -55,7 +55,6 @@ import java.io.IOException;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.security.InvalidParameterException;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
import java.util.Queue;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
@ -472,7 +471,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(TelemetryWebSocketSessionRef sessionRef) {
 | 
			
		||||
    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(WebSocketSessionRef sessionRef) {
 | 
			
		||||
        return Optional.ofNullable(tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()))
 | 
			
		||||
                .map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -17,11 +17,12 @@ package org.thingsboard.server.service.notification;
 | 
			
		||||
 | 
			
		||||
import com.google.common.base.Strings;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.context.annotation.Lazy;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.common.util.DonAsynchron;
 | 
			
		||||
import org.thingsboard.rule.engine.api.NotificationManager;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationId;
 | 
			
		||||
@ -32,6 +33,7 @@ import org.thingsboard.server.common.data.notification.Notification;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationText;
 | 
			
		||||
@ -54,18 +56,19 @@ import org.thingsboard.server.service.telemetry.AbstractSubscriptionService;
 | 
			
		||||
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
 | 
			
		||||
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.HashSet;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
@SuppressWarnings("UnstableApiUsage")
 | 
			
		||||
public class DefaultNotificationManager extends AbstractSubscriptionService implements NotificationManager, NotificationChannel {
 | 
			
		||||
 | 
			
		||||
    private final NotificationTargetService notificationTargetService;
 | 
			
		||||
@ -95,35 +98,58 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
 | 
			
		||||
        notificationRequest.setStatus(NotificationRequestStatus.PROCESSED);
 | 
			
		||||
        NotificationRequest savedNotificationRequest = notificationRequestService.saveNotificationRequest(tenantId, notificationRequest);
 | 
			
		||||
 | 
			
		||||
        NotificationRequestStats stats = new NotificationRequestStats();
 | 
			
		||||
        Map<NotificationDeliveryMethod, NotificationTextTemplate> textTemplates = notificationTemplateUtil.getTemplates(tenantId, notificationRequest.getTemplateId(), savedNotificationRequest.getDeliveryMethods());
 | 
			
		||||
        savedNotificationRequest.setTemplateContext(notificationRequest.getTemplateContext());
 | 
			
		||||
 | 
			
		||||
        DaoUtil.processBatches(pageLink -> {
 | 
			
		||||
            return notificationTargetService.findRecipientsForNotificationTarget(tenantId, notificationRequest.getTargetId(), pageLink);
 | 
			
		||||
        }, 100, recipients -> {
 | 
			
		||||
            dbCallbackExecutorService.submit(() -> {
 | 
			
		||||
                for (NotificationDeliveryMethod deliveryMethod : savedNotificationRequest.getDeliveryMethods()) {
 | 
			
		||||
                    log.debug("Sending {} notifications for request {} to recipients batch", deliveryMethod, savedNotificationRequest.getId());
 | 
			
		||||
                    NotificationChannel notificationChannel = channels.get(deliveryMethod);
 | 
			
		||||
                    for (User recipient : recipients) {
 | 
			
		||||
                        Map<String, String> templateContext = createTemplateContext(notificationRequest, recipient);
 | 
			
		||||
                        NotificationText text = notificationTemplateUtil.processTemplate(textTemplates.get(deliveryMethod), templateContext);
 | 
			
		||||
                        notificationChannel.sendNotification(recipient, savedNotificationRequest, text);
 | 
			
		||||
                    }
 | 
			
		||||
        }, 200, recipientsBatch -> {
 | 
			
		||||
            List<ListenableFuture<Void>> results = new ArrayList<>();
 | 
			
		||||
            for (NotificationDeliveryMethod deliveryMethod : savedNotificationRequest.getDeliveryMethods()) {
 | 
			
		||||
                NotificationChannel notificationChannel = channels.get(deliveryMethod);
 | 
			
		||||
                log.debug("Sending {} notifications for request {} to recipients batch", deliveryMethod, savedNotificationRequest.getId());
 | 
			
		||||
 | 
			
		||||
                List<User> recipients = recipientsBatch.getData();
 | 
			
		||||
                NotificationTextTemplate textTemplate = textTemplates.get(deliveryMethod);
 | 
			
		||||
                for (User recipient : recipients) {
 | 
			
		||||
                    ListenableFuture<Void> resultFuture = processForRecipient(recipient, savedNotificationRequest, notificationChannel, textTemplate, stats);
 | 
			
		||||
                    DonAsynchron.withCallback(resultFuture, result -> {
 | 
			
		||||
                        stats.reportSent(deliveryMethod);
 | 
			
		||||
                    }, error -> {
 | 
			
		||||
                        stats.reportError(deliveryMethod, recipient, error);
 | 
			
		||||
                    }, dbCallbackExecutorService);
 | 
			
		||||
                    results.add(resultFuture);
 | 
			
		||||
                }
 | 
			
		||||
            });
 | 
			
		||||
            }
 | 
			
		||||
            Futures.whenAllComplete(results).run(() -> {
 | 
			
		||||
                try {
 | 
			
		||||
                    notificationRequestService.updateNotificationRequestStats(tenantId, savedNotificationRequest.getId(), stats);
 | 
			
		||||
                } catch (Exception e) {
 | 
			
		||||
                    log.error("Failed to update stats for notification request {}", savedNotificationRequest.getId(), e);
 | 
			
		||||
                }
 | 
			
		||||
            }, dbCallbackExecutorService);
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        return savedNotificationRequest;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<String, String> createTemplateContext(NotificationRequest notificationRequest, User recipient) {
 | 
			
		||||
        Map<String, String> templateContext = new HashMap<>();
 | 
			
		||||
        templateContext.put("email", recipient.getEmail());
 | 
			
		||||
        templateContext.put("firstName", Strings.nullToEmpty(recipient.getFirstName()));
 | 
			
		||||
        templateContext.put("lastName", Strings.nullToEmpty(recipient.getLastName()));
 | 
			
		||||
        if (notificationRequest.getTemplateContext() != null) {
 | 
			
		||||
            templateContext.putAll(notificationRequest.getTemplateContext());
 | 
			
		||||
    private ListenableFuture<Void> processForRecipient(User recipient, NotificationRequest notificationRequest, NotificationChannel notificationChannel,
 | 
			
		||||
                                                       NotificationTextTemplate textTemplate, NotificationRequestStats stats) {
 | 
			
		||||
        NotificationText text;
 | 
			
		||||
        try {
 | 
			
		||||
            Map<String, String> templateContext = new HashMap<>();
 | 
			
		||||
            templateContext.put("email", recipient.getEmail());
 | 
			
		||||
            templateContext.put("firstName", Strings.nullToEmpty(recipient.getFirstName()));
 | 
			
		||||
            templateContext.put("lastName", Strings.nullToEmpty(recipient.getLastName()));
 | 
			
		||||
            if (notificationRequest.getTemplateContext() != null) {
 | 
			
		||||
                templateContext.putAll(notificationRequest.getTemplateContext());
 | 
			
		||||
            }
 | 
			
		||||
            text = notificationTemplateUtil.processTemplate(textTemplate, templateContext);
 | 
			
		||||
        } catch (Exception e) {
 | 
			
		||||
            return Futures.immediateFailedFuture(e);
 | 
			
		||||
        }
 | 
			
		||||
        return templateContext;
 | 
			
		||||
        return notificationChannel.sendNotification(recipient, notificationRequest, text);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void forwardToNotificationSchedulerService(TenantId tenantId, NotificationRequestId notificationRequestId, boolean deleted) {
 | 
			
		||||
@ -141,7 +167,7 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Future<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
        log.trace("Creating notification for recipient {} (notification request id: {})", recipient.getId(), request.getId());
 | 
			
		||||
        Notification notification = Notification.builder()
 | 
			
		||||
                .requestId(request.getId())
 | 
			
		||||
@ -194,18 +220,17 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
 | 
			
		||||
        return notificationRequest;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Future<Void> onNotificationUpdate(TenantId tenantId, UserId recipientId, Notification notification, boolean isNew) {
 | 
			
		||||
    private ListenableFuture<Void> onNotificationUpdate(TenantId tenantId, UserId recipientId, Notification notification, boolean isNew) {
 | 
			
		||||
        NotificationUpdate update = NotificationUpdate.builder()
 | 
			
		||||
                .notification(notification)
 | 
			
		||||
                .isNew(isNew)
 | 
			
		||||
                .build();
 | 
			
		||||
        log.trace("Submitting notification update for recipient {}: {}", recipientId, update);
 | 
			
		||||
        return wsCallBackExecutor.submit(() -> {
 | 
			
		||||
        return Futures.submit(() -> {
 | 
			
		||||
            forwardToSubscriptionManagerService(tenantId, recipientId, subscriptionManagerService -> {
 | 
			
		||||
                subscriptionManagerService.onNotificationUpdate(tenantId, recipientId, update, TbCallback.EMPTY);
 | 
			
		||||
            }, () -> TbSubscriptionUtils.notificationUpdateToProto(tenantId, recipientId, update));
 | 
			
		||||
            return null;
 | 
			
		||||
        });
 | 
			
		||||
        }, wsCallBackExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update) {
 | 
			
		||||
 | 
			
		||||
@ -15,6 +15,7 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.rule.engine.api.MailService;
 | 
			
		||||
@ -24,8 +25,6 @@ import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationText;
 | 
			
		||||
import org.thingsboard.server.service.mail.MailExecutorService;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class EmailNotificationChannel implements NotificationChannel {
 | 
			
		||||
@ -34,7 +33,7 @@ public class EmailNotificationChannel implements NotificationChannel {
 | 
			
		||||
    private final MailExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Future<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            mailService.sendEmail(recipient.getTenantId(), recipient.getEmail(), text.getSubject(), text.getBody());
 | 
			
		||||
            return null;
 | 
			
		||||
 | 
			
		||||
@ -15,16 +15,15 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationText;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
 | 
			
		||||
public interface NotificationChannel {
 | 
			
		||||
 | 
			
		||||
    Future<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text);
 | 
			
		||||
    ListenableFuture<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text);
 | 
			
		||||
 | 
			
		||||
    NotificationDeliveryMethod getDeliveryMethod();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.service.notification.channels;
 | 
			
		||||
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.apache.commons.lang3.StringUtils;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
@ -26,8 +27,6 @@ import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.template.NotificationText;
 | 
			
		||||
import org.thingsboard.server.service.sms.SmsExecutorService;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.Future;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@RequiredArgsConstructor
 | 
			
		||||
public class SmsNotificationChannel implements NotificationChannel {
 | 
			
		||||
@ -36,9 +35,9 @@ public class SmsNotificationChannel implements NotificationChannel {
 | 
			
		||||
    private final SmsExecutorService executor;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Future<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
    public ListenableFuture<Void> sendNotification(User recipient, NotificationRequest request, NotificationText text) {
 | 
			
		||||
        String phone = recipient.getPhone();
 | 
			
		||||
        if (StringUtils.isBlank(phone)) return Futures.immediateFuture(null);
 | 
			
		||||
        if (StringUtils.isBlank(phone)) return Futures.immediateFailedFuture(new RuntimeException("User does not have phone number"));
 | 
			
		||||
        return executor.submit(() -> {
 | 
			
		||||
            smsService.sendSms(recipient.getTenantId(), recipient.getCustomerId(), new String[]{phone}, text.getBody());
 | 
			
		||||
            return null;
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,72 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.ttl;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationDao;
 | 
			
		||||
import org.thingsboard.server.dao.notification.NotificationRequestDao;
 | 
			
		||||
import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository;
 | 
			
		||||
import org.thingsboard.server.queue.discovery.PartitionService;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import static org.thingsboard.server.dao.model.ModelConstants.NOTIFICATION_TABLE_NAME;
 | 
			
		||||
 | 
			
		||||
@Service
 | 
			
		||||
@ConditionalOnExpression("${sql.ttl.notifications.enabled:true} && ${sql.ttl.notifications.ttl:0} > 0")
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class NotificationsCleanUpService extends AbstractCleanUpService {
 | 
			
		||||
 | 
			
		||||
    private final NotificationDao notificationDao;
 | 
			
		||||
    private final NotificationRequestDao notificationRequestDao;
 | 
			
		||||
    private final SqlPartitioningRepository partitioningRepository;
 | 
			
		||||
 | 
			
		||||
    @Value("${sql.ttl.notifications.ttl:2592000}")
 | 
			
		||||
    private long ttlInSec;
 | 
			
		||||
    @Value("${sql.notifications.partition_size:168}")
 | 
			
		||||
    private int partitionSizeInHours;
 | 
			
		||||
 | 
			
		||||
    public NotificationsCleanUpService(PartitionService partitionService,
 | 
			
		||||
                                       NotificationDao notificationDao,
 | 
			
		||||
                                       NotificationRequestDao notificationRequestDao,
 | 
			
		||||
                                       SqlPartitioningRepository partitioningRepository) {
 | 
			
		||||
        super(partitionService);
 | 
			
		||||
        this.notificationDao = notificationDao;
 | 
			
		||||
        this.notificationRequestDao = notificationRequestDao;
 | 
			
		||||
        this.partitioningRepository = partitioningRepository;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.notifications.checking_interval_ms:86400000})}",
 | 
			
		||||
            fixedDelayString = "${sql.ttl.notifications.checking_interval_ms:86400000}")
 | 
			
		||||
    public void cleanUp() {
 | 
			
		||||
        long expTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec);
 | 
			
		||||
        long partitionDurationMs = TimeUnit.HOURS.toMillis(partitionSizeInHours);
 | 
			
		||||
        if (isSystemTenantPartitionMine()) {
 | 
			
		||||
            long actualExpTime = partitioningRepository.getLastPartitionEnd(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
 | 
			
		||||
            
 | 
			
		||||
            partitioningRepository.dropPartitionsBefore(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
 | 
			
		||||
            // select distinct request_id for period and manually delete them ?
 | 
			
		||||
            // sql trigger ? ----
 | 
			
		||||
        } else {
 | 
			
		||||
            partitioningRepository.cleanupPartitionsCache(NOTIFICATION_TABLE_NAME, expTime, partitionDurationMs);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -952,7 +952,7 @@ public class DefaultWebSocketService implements WebSocketService {
 | 
			
		||||
        return limit == 0 ? DEFAULT_LIMIT : limit;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(TelemetryWebSocketSessionRef sessionRef) {
 | 
			
		||||
    private DefaultTenantProfileConfiguration getTenantProfileConfiguration(WebSocketSessionRef sessionRef) {
 | 
			
		||||
        return Optional.ofNullable(tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId()))
 | 
			
		||||
                .map(TenantProfile::getDefaultProfileConfiguration).orElse(null);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -278,7 +278,7 @@ sql:
 | 
			
		||||
  audit_logs:
 | 
			
		||||
    partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
 | 
			
		||||
  notifications:
 | 
			
		||||
    partition_size: "${SQL_NOTIFICATIONS_PARTITION_SIZE_HOURS:168}"
 | 
			
		||||
    partition_size: "${SQL_NOTIFICATIONS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week
 | 
			
		||||
  # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
 | 
			
		||||
  batch_sort: "${SQL_BATCH_SORT:true}"
 | 
			
		||||
  # Specify whether to remove null characters from strValue of attributes and timeseries before insert
 | 
			
		||||
@ -321,6 +321,10 @@ sql:
 | 
			
		||||
      enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}"
 | 
			
		||||
      ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size
 | 
			
		||||
      checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
 | 
			
		||||
    notifications:
 | 
			
		||||
      enabled: "${SQL_TTL_NOTIFICATIONS_ENABLED:true}"
 | 
			
		||||
      ttl: "${SQL_TTL_NOTIFICATIONS_SECS:2592000}" # Default value - 30 days
 | 
			
		||||
      checking_interval_ms: "${SQL_TTL_NOTIFICATIONS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day
 | 
			
		||||
  relations:
 | 
			
		||||
    max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # This value has to be reasonable small to prevent infinite recursion as early as possible
 | 
			
		||||
    pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonable small to prevent relation query blocking all other DB calls
 | 
			
		||||
 | 
			
		||||
@ -48,12 +48,12 @@ import org.thingsboard.server.common.data.query.KeyFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
 | 
			
		||||
import org.thingsboard.server.common.data.query.SingleEntityFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.TsValue;
 | 
			
		||||
import org.thingsboard.server.service.subscription.SubscriptionErrorCode;
 | 
			
		||||
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataUpdate;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
 | 
			
		||||
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,7 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityFilter;
 | 
			
		||||
import org.thingsboard.server.common.data.query.EntityKey;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.TelemetryPluginCmdsWrapper;
 | 
			
		||||
import org.thingsboard.server.service.telemetry.cmd.v1.AttributesSubscriptionCmd;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v1.AttributesSubscriptionCmd;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountCmd;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
 | 
			
		||||
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityDataCmd;
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.service.notification;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.core.type.TypeReference;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.assertj.core.data.Offset;
 | 
			
		||||
import org.java_websocket.client.WebSocketClient;
 | 
			
		||||
import org.junit.Before;
 | 
			
		||||
@ -31,6 +32,7 @@ import org.thingsboard.server.common.data.notification.NotificationDeliveryMetho
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.targets.SingleUserNotificationTargetConfig;
 | 
			
		||||
@ -49,17 +51,16 @@ import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpd
 | 
			
		||||
 | 
			
		||||
import java.net.URISyntaxException;
 | 
			
		||||
import java.util.HashMap;
 | 
			
		||||
import java.util.LinkedList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.assertj.core.api.Assertions.not;
 | 
			
		||||
import static org.awaitility.Awaitility.await;
 | 
			
		||||
 | 
			
		||||
@DaoSqlTest
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
@ -251,9 +252,10 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testNotificationUpdatesForUsersInTarget() throws Exception {
 | 
			
		||||
        Map<User, NotificationApiWsClient> wsSessions = createUsersAndSetUpWsSessions(100);
 | 
			
		||||
        wsSessions.forEach((user, wsClient) -> {
 | 
			
		||||
    public void testNotificationUpdatesForALotOfUsers() throws Exception {
 | 
			
		||||
        int usersCount = 100;
 | 
			
		||||
        Map<User, NotificationApiWsClient> wsSessions = createUsersAndSetUpWsSessions(usersCount);
 | 
			
		||||
        wsSessions.values().forEach(wsClient -> {
 | 
			
		||||
            wsClient.subscribeForUnreadNotifications(10);
 | 
			
		||||
            wsClient.waitForReply(true);
 | 
			
		||||
            wsClient.subscribeForUnreadNotificationsCount();
 | 
			
		||||
@ -263,14 +265,15 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
        NotificationTarget notificationTarget = new NotificationTarget();
 | 
			
		||||
        UserListNotificationTargetConfig config = new UserListNotificationTargetConfig();
 | 
			
		||||
        config.setUsersIds(wsSessions.keySet().stream().map(User::getUuidId).collect(Collectors.toList()));
 | 
			
		||||
        notificationTarget.setName("100 users");
 | 
			
		||||
        notificationTarget.setName("Test users");
 | 
			
		||||
        notificationTarget.setTenantId(tenantId);
 | 
			
		||||
        notificationTarget.setConfiguration(config);
 | 
			
		||||
        notificationTarget = saveNotificationTarget(notificationTarget);
 | 
			
		||||
 | 
			
		||||
        wsSessions.forEach((user, wsClient) -> wsClient.registerWaitForUpdate(2));
 | 
			
		||||
        NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), "Hello, ${email}");
 | 
			
		||||
        await().atMost(5, TimeUnit.SECONDS)
 | 
			
		||||
        NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), "Hello, ${email}",
 | 
			
		||||
                NotificationDeliveryMethod.WEBSOCKET);
 | 
			
		||||
        await().atMost(20, TimeUnit.SECONDS)
 | 
			
		||||
                .pollDelay(1, TimeUnit.SECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
 | 
			
		||||
                .until(() -> wsSessions.values().stream()
 | 
			
		||||
                        .allMatch(wsClient -> wsClient.getLastDataUpdate() != null
 | 
			
		||||
@ -285,21 +288,23 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
            assertThat(notification.getRequestId()).isEqualTo(notificationRequest.getId());
 | 
			
		||||
            assertThat(notification.getText()).isEqualTo("Hello, " + user.getEmail());
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        await().atMost(2, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> findNotificationRequest(notificationRequest.getId()).getStats() != null);
 | 
			
		||||
        NotificationRequestStats stats = findNotificationRequest(notificationRequest.getId()).getStats();
 | 
			
		||||
        assertThat(stats.getSent().get(NotificationDeliveryMethod.WEBSOCKET)).hasValue(usersCount);
 | 
			
		||||
 | 
			
		||||
        wsSessions.values().forEach(WebSocketClient::close);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Map<User, NotificationApiWsClient> createUsersAndSetUpWsSessions(int count) throws Exception {
 | 
			
		||||
        List<User> users = new LinkedList<>();
 | 
			
		||||
        Map<User, NotificationApiWsClient> wsSessions = new HashMap<>();
 | 
			
		||||
        for (int i = 1; i <= count; i++) {
 | 
			
		||||
            User user = new User();
 | 
			
		||||
            user.setTenantId(tenantId);
 | 
			
		||||
            user.setAuthority(Authority.TENANT_ADMIN);
 | 
			
		||||
            user.setEmail("test-user-" + i + "@thingsboard.org");
 | 
			
		||||
            users.add(createUser(user, "12345678"));
 | 
			
		||||
        }
 | 
			
		||||
        Map<User, NotificationApiWsClient> wsSessions = new HashMap<>();
 | 
			
		||||
        for (User user : users) {
 | 
			
		||||
            login(user.getEmail(), "12345678");
 | 
			
		||||
            user = createUserAndLogin(user, "12345678");
 | 
			
		||||
            NotificationApiWsClient wsClient = (NotificationApiWsClient) buildAndConnectWebSocketClient();
 | 
			
		||||
            wsSessions.put(user, wsClient);
 | 
			
		||||
        }
 | 
			
		||||
@ -307,6 +312,27 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
        return wsSessions;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testNotificationRequestStats() throws Exception {
 | 
			
		||||
        getWsClient().subscribeForUnreadNotifications(10);
 | 
			
		||||
        getWsClient().waitForReply(true);
 | 
			
		||||
 | 
			
		||||
        getWsClient().registerWaitForUpdate();
 | 
			
		||||
        NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
 | 
			
		||||
        NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), "Test :)",
 | 
			
		||||
                NotificationDeliveryMethod.WEBSOCKET, NotificationDeliveryMethod.EMAIL, NotificationDeliveryMethod.SMS);
 | 
			
		||||
        getWsClient().waitForUpdate();
 | 
			
		||||
 | 
			
		||||
        await().atMost(2, TimeUnit.SECONDS)
 | 
			
		||||
                .until(() -> findNotificationRequest(notificationRequest.getId()).getStats() != null);
 | 
			
		||||
        NotificationRequestStats stats = findNotificationRequest(notificationRequest.getId()).getStats();
 | 
			
		||||
 | 
			
		||||
        assertThat(stats.getSent().get(NotificationDeliveryMethod.WEBSOCKET)).hasValue(1);
 | 
			
		||||
        assertThat(stats.getSent().get(NotificationDeliveryMethod.EMAIL)).hasValue(1);
 | 
			
		||||
        assertThat(stats.getErrors().get(NotificationDeliveryMethod.SMS)).size().isOne();
 | 
			
		||||
        System.err.println(stats);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    private void checkFullNotificationsUpdate(UnreadNotificationsUpdate notificationsUpdate, String... expectedNotifications) {
 | 
			
		||||
        assertThat(notificationsUpdate.getNotifications()).extracting(Notification::getText).containsOnly(expectedNotifications);
 | 
			
		||||
@ -332,11 +358,11 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
        return doPost("/api/notification/target", notificationTarget, NotificationTarget.class);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text) {
 | 
			
		||||
        return submitNotificationRequest(targetId, text, 0);
 | 
			
		||||
    private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text, NotificationDeliveryMethod... deliveryMethods) {
 | 
			
		||||
        return submitNotificationRequest(targetId, text, 0, deliveryMethods);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text, int delayInSec) {
 | 
			
		||||
    private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text, int delayInSec, NotificationDeliveryMethod... deliveryMethods) {
 | 
			
		||||
        NotificationTemplate notificationTemplate = createNotificationTemplate(text);
 | 
			
		||||
        NotificationRequestConfig config = new NotificationRequestConfig();
 | 
			
		||||
        config.setSendingDelayInSec(delayInSec);
 | 
			
		||||
@ -348,7 +374,7 @@ public class NotificationApiTest extends AbstractControllerTest {
 | 
			
		||||
                .type("Test")
 | 
			
		||||
                .templateId(notificationTemplate.getId())
 | 
			
		||||
                .info(notificationInfo)
 | 
			
		||||
                .deliveryMethods(List.of(NotificationDeliveryMethod.WEBSOCKET))
 | 
			
		||||
                .deliveryMethods(deliveryMethods.length > 0 ? List.of(deliveryMethods) : List.of(NotificationDeliveryMethod.WEBSOCKET))
 | 
			
		||||
                .additionalConfig(config)
 | 
			
		||||
                .build();
 | 
			
		||||
        return doPost("/api/notification/request", notificationRequest, NotificationRequest.class);
 | 
			
		||||
 | 
			
		||||
@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRuleId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,6 @@ public interface NotificationRequestService {
 | 
			
		||||
 | 
			
		||||
    PageData<NotificationRequest> findScheduledNotificationRequests(PageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    NotificationRequestInfo getNotificationRequestInfoById(TenantId tenantId, NotificationRequestId id);
 | 
			
		||||
    void updateNotificationRequestStats(TenantId tenantId, NotificationRequestId notificationRequestId, NotificationRequestStats stats);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -87,6 +87,8 @@ public class EntityIdFactory {
 | 
			
		||||
                return new NotificationRequestId(uuid);
 | 
			
		||||
            case NOTIFICATION_RULE:
 | 
			
		||||
                return new NotificationRuleId(uuid);
 | 
			
		||||
            case NOTIFICATION_TEMPLATE:
 | 
			
		||||
                return new NotificationTemplateId(uuid);
 | 
			
		||||
        }
 | 
			
		||||
        throw new IllegalArgumentException("EntityType " + type + " is not supported!");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.common.data.notification;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonIgnore;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonProperty;
 | 
			
		||||
import lombok.AllArgsConstructor;
 | 
			
		||||
import lombok.Builder;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
@ -38,6 +39,8 @@ import javax.validation.constraints.NotNull;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
import static com.fasterxml.jackson.annotation.JsonProperty.Access.READ_ONLY;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@EqualsAndHashCode(callSuper = true)
 | 
			
		||||
@NoArgsConstructor
 | 
			
		||||
@ -67,25 +70,10 @@ public class NotificationRequest extends BaseData<NotificationRequestId> impleme
 | 
			
		||||
 | 
			
		||||
    private NotificationRequestStatus status;
 | 
			
		||||
 | 
			
		||||
    private NotificationRequestStats stats;
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    private transient Map<String, String> templateContext;
 | 
			
		||||
 | 
			
		||||
    public NotificationRequest(NotificationRequest other) {
 | 
			
		||||
        super(other);
 | 
			
		||||
        this.tenantId = other.tenantId;
 | 
			
		||||
        this.targetId = other.targetId;
 | 
			
		||||
        this.type = other.type;
 | 
			
		||||
        this.templateId = other.templateId;
 | 
			
		||||
        this.info = other.info;
 | 
			
		||||
        this.deliveryMethods = other.deliveryMethods;
 | 
			
		||||
        this.additionalConfig = other.additionalConfig;
 | 
			
		||||
        this.originatorType = other.originatorType;
 | 
			
		||||
        this.originatorEntityId = other.originatorEntityId;
 | 
			
		||||
        this.ruleId = other.ruleId;
 | 
			
		||||
        this.status = other.status;
 | 
			
		||||
        this.templateContext = other.templateContext;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @JsonIgnore
 | 
			
		||||
    @Override
 | 
			
		||||
    public String getName() {
 | 
			
		||||
 | 
			
		||||
@ -1,35 +0,0 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.notification;
 | 
			
		||||
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import lombok.EqualsAndHashCode;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
@EqualsAndHashCode(callSuper = true)
 | 
			
		||||
public class NotificationRequestInfo extends NotificationRequest {
 | 
			
		||||
 | 
			
		||||
    private int sent;
 | 
			
		||||
    private int read;
 | 
			
		||||
    private Map<String, NotificationStatus> statusesByRecipient;
 | 
			
		||||
 | 
			
		||||
    public NotificationRequestInfo(NotificationRequest notificationRequest) {
 | 
			
		||||
        super(notificationRequest);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -0,0 +1,54 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.notification;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonCreator;
 | 
			
		||||
import com.fasterxml.jackson.annotation.JsonProperty;
 | 
			
		||||
import lombok.Data;
 | 
			
		||||
import org.thingsboard.server.common.data.User;
 | 
			
		||||
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ConcurrentHashMap;
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@Data
 | 
			
		||||
public class NotificationRequestStats {
 | 
			
		||||
 | 
			
		||||
    private final Map<NotificationDeliveryMethod, AtomicInteger> sent;
 | 
			
		||||
    private final Map<NotificationDeliveryMethod, Map<String, String>> errors;
 | 
			
		||||
 | 
			
		||||
    public NotificationRequestStats() {
 | 
			
		||||
        this.sent = new ConcurrentHashMap<>();
 | 
			
		||||
        this.errors = new ConcurrentHashMap<>();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @JsonCreator
 | 
			
		||||
    public NotificationRequestStats(@JsonProperty("sent") Map<NotificationDeliveryMethod, AtomicInteger> sent,
 | 
			
		||||
                                    @JsonProperty("errors") Map<NotificationDeliveryMethod, Map<String, String>> errors) {
 | 
			
		||||
        this.sent = sent;
 | 
			
		||||
        this.errors = errors;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void reportSent(NotificationDeliveryMethod deliveryMethod) {
 | 
			
		||||
        sent.computeIfAbsent(deliveryMethod, k -> new AtomicInteger()).incrementAndGet();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void reportError(NotificationDeliveryMethod deliveryMethod, User recipient, Throwable error) {
 | 
			
		||||
        String errorMessage = error.getMessage();
 | 
			
		||||
        errors.computeIfAbsent(deliveryMethod, k -> new ConcurrentHashMap<>()).put(recipient.getEmail(), errorMessage);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@ -1,3 +1,18 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2022 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.common.data.util;
 | 
			
		||||
 | 
			
		||||
@FunctionalInterface
 | 
			
		||||
 | 
			
		||||
@ -110,17 +110,17 @@ public abstract class DaoUtil {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static <T> void processInBatches(Function<PageLink, PageData<T>> finder, int batchSize, Consumer<T> processor) {
 | 
			
		||||
       processBatches(finder, batchSize, batch -> batch.forEach(processor));
 | 
			
		||||
       processBatches(finder, batchSize, batch -> batch.getData().forEach(processor));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public static <T> void processBatches(Function<PageLink, PageData<T>> finder, int batchSize, Consumer<List<T>> processor) {
 | 
			
		||||
    public static <T> void processBatches(Function<PageLink, PageData<T>> finder, int batchSize, Consumer<PageData<T>> processor) {
 | 
			
		||||
        PageLink pageLink = new PageLink(batchSize);
 | 
			
		||||
        PageData<T> batch;
 | 
			
		||||
 | 
			
		||||
        boolean hasNextBatch;
 | 
			
		||||
        do {
 | 
			
		||||
            batch = finder.apply(pageLink);
 | 
			
		||||
            processor.accept(batch.getData());
 | 
			
		||||
            processor.accept(batch);
 | 
			
		||||
 | 
			
		||||
            hasNextBatch = batch.hasNext();
 | 
			
		||||
            pageLink = pageLink.nextPageLink();
 | 
			
		||||
 | 
			
		||||
@ -673,6 +673,7 @@ public class ModelConstants {
 | 
			
		||||
    public static final String NOTIFICATION_REQUEST_ADDITIONAL_CONFIG_PROPERTY = "additional_config";
 | 
			
		||||
    public static final String NOTIFICATION_REQUEST_STATUS_PROPERTY = "status";
 | 
			
		||||
    public static final String NOTIFICATION_REQUEST_RULE_ID_PROPERTY = "rule_id";
 | 
			
		||||
    public static final String NOTIFICATION_REQUEST_STATS_PROPERTY = "stats";
 | 
			
		||||
 | 
			
		||||
    public static final String NOTIFICATION_RULE_TABLE_NAME = "notification_rule";
 | 
			
		||||
    public static final String NOTIFICATION_RULE_TEMPLATE_ID_PROPERTY = "template_id";
 | 
			
		||||
 | 
			
		||||
@ -33,6 +33,7 @@ import org.thingsboard.server.common.data.notification.NotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.dao.model.BaseSqlEntity;
 | 
			
		||||
import org.thingsboard.server.dao.model.ModelConstants;
 | 
			
		||||
@ -95,6 +96,10 @@ public class NotificationRequestEntity extends BaseSqlEntity<NotificationRequest
 | 
			
		||||
    @Column(name = ModelConstants.NOTIFICATION_REQUEST_STATUS_PROPERTY)
 | 
			
		||||
    private NotificationRequestStatus status;
 | 
			
		||||
 | 
			
		||||
    @Type(type = "json")
 | 
			
		||||
    @Column(name = ModelConstants.NOTIFICATION_REQUEST_STATS_PROPERTY)
 | 
			
		||||
    private JsonNode stats;
 | 
			
		||||
 | 
			
		||||
    public NotificationRequestEntity() {}
 | 
			
		||||
 | 
			
		||||
    public NotificationRequestEntity(NotificationRequest notificationRequest) {
 | 
			
		||||
@ -114,6 +119,7 @@ public class NotificationRequestEntity extends BaseSqlEntity<NotificationRequest
 | 
			
		||||
        }
 | 
			
		||||
        setRuleId(getUuid(notificationRequest.getRuleId()));
 | 
			
		||||
        setStatus(notificationRequest.getStatus());
 | 
			
		||||
        setStats(toJson(notificationRequest.getStats()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
@ -137,6 +143,7 @@ public class NotificationRequestEntity extends BaseSqlEntity<NotificationRequest
 | 
			
		||||
        }
 | 
			
		||||
        notificationRequest.setRuleId(createId(ruleId, NotificationRuleId::new));
 | 
			
		||||
        notificationRequest.setStatus(status);
 | 
			
		||||
        notificationRequest.setStats(fromJson(stats, NotificationRequestStats.class));
 | 
			
		||||
        return notificationRequest;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRuleId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
@ -73,8 +73,8 @@ public class DefaultNotificationRequestService implements NotificationRequestSer
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public NotificationRequestInfo getNotificationRequestInfoById(TenantId tenantId, NotificationRequestId id) {
 | 
			
		||||
        return notificationRequestDao.getNotificationRequestInfoById(tenantId, id);
 | 
			
		||||
    public void updateNotificationRequestStats(TenantId tenantId, NotificationRequestId notificationRequestId, NotificationRequestStats stats) {
 | 
			
		||||
        notificationRequestDao.updateStatsById(tenantId, notificationRequestId, stats);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,7 +20,7 @@ import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRuleId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
@ -36,6 +36,6 @@ public interface NotificationRequestDao extends Dao<NotificationRequest> {
 | 
			
		||||
 | 
			
		||||
    PageData<NotificationRequest> findAllByStatus(NotificationRequestStatus status, PageLink pageLink);
 | 
			
		||||
 | 
			
		||||
    NotificationRequestInfo getNotificationRequestInfoById(TenantId tenantId, NotificationRequestId id);
 | 
			
		||||
    void updateStatsById(TenantId tenantId, NotificationRequestId notificationRequestId, NotificationRequestStats stats);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -20,13 +20,11 @@ import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.UserId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.Notification;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
@ -58,7 +56,6 @@ public class JpaNotificationDao extends JpaAbstractDao<NotificationEntity, Notif
 | 
			
		||||
            UUID uuid = Uuids.timeBased();
 | 
			
		||||
            notification.setId(new NotificationId(uuid));
 | 
			
		||||
            notification.setCreatedTime(Uuids.unixTimestamp(uuid));
 | 
			
		||||
            // todo: regarding ttl, it might be better to remove notifications on NotificationRequest level
 | 
			
		||||
            partitioningRepository.createPartitionIfNotExists(ModelConstants.NOTIFICATION_TABLE_NAME,
 | 
			
		||||
                    notification.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -19,16 +19,15 @@ import com.google.common.base.Strings;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.stereotype.Component;
 | 
			
		||||
import org.springframework.transaction.annotation.Transactional;
 | 
			
		||||
import org.thingsboard.common.util.JacksonUtil;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.id.EntityId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.NotificationRuleId;
 | 
			
		||||
import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestInfo;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationStatus;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageData;
 | 
			
		||||
import org.thingsboard.server.common.data.page.PageLink;
 | 
			
		||||
import org.thingsboard.server.dao.DaoUtil;
 | 
			
		||||
@ -38,9 +37,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDao;
 | 
			
		||||
import org.thingsboard.server.dao.util.SqlDao;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
import java.util.stream.Collectors;
 | 
			
		||||
 | 
			
		||||
@Component
 | 
			
		||||
@SqlDao
 | 
			
		||||
@ -48,7 +45,6 @@ import java.util.stream.Collectors;
 | 
			
		||||
public class JpaNotificationRequestDao extends JpaAbstractDao<NotificationRequestEntity, NotificationRequest> implements NotificationRequestDao {
 | 
			
		||||
 | 
			
		||||
    private final NotificationRequestRepository notificationRequestRepository;
 | 
			
		||||
    private final NotificationRepository notificationRepository;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<NotificationRequest> findByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink) {
 | 
			
		||||
@ -66,16 +62,9 @@ public class JpaNotificationRequestDao extends JpaAbstractDao<NotificationReques
 | 
			
		||||
        return DaoUtil.toPageData(notificationRequestRepository.findAllByStatus(status, DaoUtil.toPageable(pageLink)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Transactional(readOnly = true)
 | 
			
		||||
    @Override
 | 
			
		||||
    public NotificationRequestInfo getNotificationRequestInfoById(TenantId tenantId, NotificationRequestId id) {
 | 
			
		||||
        NotificationRequestInfo notificationRequestInfo = new NotificationRequestInfo(findById(tenantId, id.getId()));
 | 
			
		||||
        notificationRequestInfo.setSent(notificationRepository.countByRequestId(id.getId()));
 | 
			
		||||
        notificationRequestInfo.setRead(notificationRepository.countByRequestIdAndStatus(id.getId(), NotificationStatus.READ));
 | 
			
		||||
        Map<String, NotificationStatus> statusesByRecipient = notificationRepository.getStatusesByRecipientForRequestId(id.getId()).stream()
 | 
			
		||||
                .collect(Collectors.toMap(r -> (String) r[0], r -> (NotificationStatus) r[1]));
 | 
			
		||||
        notificationRequestInfo.setStatusesByRecipient(statusesByRecipient);
 | 
			
		||||
        return notificationRequestInfo;
 | 
			
		||||
    public void updateStatsById(TenantId tenantId, NotificationRequestId notificationRequestId, NotificationRequestStats stats) {
 | 
			
		||||
        notificationRequestRepository.updateStatsById(notificationRequestId.getId(), JacksonUtil.valueToTree(stats));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
 | 
			
		||||
@ -48,12 +48,4 @@ public interface NotificationRepository extends JpaRepository<NotificationEntity
 | 
			
		||||
 | 
			
		||||
    Page<NotificationEntity> findByRequestId(UUID requestId, Pageable pageable);
 | 
			
		||||
 | 
			
		||||
    int countByRequestId(UUID requestId);
 | 
			
		||||
 | 
			
		||||
    int countByRequestIdAndStatus(UUID requestId, NotificationStatus status);
 | 
			
		||||
 | 
			
		||||
    @Query("SELECT u.email, n.status FROM NotificationEntity n INNER JOIN UserEntity u ON n.recipientId = u.id " +
 | 
			
		||||
            "WHERE n.requestId = :requestId")
 | 
			
		||||
    List<Object[]> getStatusesByRecipientForRequestId(@Param("requestId") UUID requestId);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -15,12 +15,15 @@
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.dao.sql.notification;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import org.springframework.data.domain.Page;
 | 
			
		||||
import org.springframework.data.domain.Pageable;
 | 
			
		||||
import org.springframework.data.jpa.repository.JpaRepository;
 | 
			
		||||
import org.springframework.data.jpa.repository.Modifying;
 | 
			
		||||
import org.springframework.data.jpa.repository.Query;
 | 
			
		||||
import org.springframework.data.repository.query.Param;
 | 
			
		||||
import org.springframework.stereotype.Repository;
 | 
			
		||||
import org.springframework.transaction.annotation.Transactional;
 | 
			
		||||
import org.thingsboard.server.common.data.EntityType;
 | 
			
		||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
 | 
			
		||||
import org.thingsboard.server.dao.model.sql.NotificationRequestEntity;
 | 
			
		||||
@ -40,4 +43,9 @@ public interface NotificationRequestRepository extends JpaRepository<Notificatio
 | 
			
		||||
 | 
			
		||||
    Page<NotificationRequestEntity> findAllByStatus(NotificationRequestStatus status, Pageable pageable);
 | 
			
		||||
 | 
			
		||||
    @Modifying
 | 
			
		||||
    @Transactional
 | 
			
		||||
    @Query("UPDATE NotificationRequestEntity r SET r.stats = :stats WHERE r.id = :id")
 | 
			
		||||
    void updateStatsById(@Param("id") UUID id, @Param("stats") JsonNode stats);
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -95,6 +95,13 @@ public class SqlPartitioningRepository {
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public long getLastPartitionEnd(String table, long before, long partitionDurationMs) {
 | 
			
		||||
        return fetchPartitions(table).stream()
 | 
			
		||||
                .mapToLong(partitionStartTime -> partitionStartTime + partitionDurationMs)
 | 
			
		||||
                .filter(partitionEndTime -> partitionEndTime < before)
 | 
			
		||||
                .max().orElse(0);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) {
 | 
			
		||||
        Map<Long, SqlPartition> partitions = tablesPartitions.get(table);
 | 
			
		||||
        if (partitions == null) return;
 | 
			
		||||
 | 
			
		||||
@ -821,7 +821,8 @@ CREATE TABLE IF NOT EXISTS notification_request (
 | 
			
		||||
    originator_entity_id UUID,
 | 
			
		||||
    originator_entity_type VARCHAR(32),
 | 
			
		||||
    rule_id UUID NULL CONSTRAINT fk_notification_request_rule_id REFERENCES notification_rule(id),
 | 
			
		||||
    status VARCHAR(32)
 | 
			
		||||
    status VARCHAR(32),
 | 
			
		||||
    stats VARCHAR(1000)
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
CREATE TABLE IF NOT EXISTS notification (
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user