Add indexes for notification entities; refactoring

This commit is contained in:
ViacheslavKlimov 2023-01-08 13:42:43 +02:00
parent 1317a8aca9
commit 5c5d683236
13 changed files with 100 additions and 34 deletions

View File

@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS notification_template (
notification_subject VARCHAR(255),
configuration VARCHAR(10000) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_notification_template_tenant_id_created_time ON notification_template(tenant_id, created_time DESC);
CREATE TABLE IF NOT EXISTS notification_rule (
id UUID NOT NULL CONSTRAINT notification_rule_pkey PRIMARY KEY,
@ -42,12 +43,13 @@ CREATE TABLE IF NOT EXISTS notification_rule (
delivery_methods VARCHAR(255) NOT NULL,
configuration VARCHAR(2000) NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_notification_rule_tenant_id_created_time ON notification_rule(tenant_id, created_time DESC);
CREATE TABLE IF NOT EXISTS notification_request (
id UUID NOT NULL CONSTRAINT notification_request_pkey PRIMARY KEY,
created_time BIGINT NOT NULL,
tenant_id UUID NULL CONSTRAINT fk_notification_request_tenant_id REFERENCES tenant(id) ON DELETE CASCADE,
targets VARCHAR(5000) NOT NULL,
targets VARCHAR(10000) NOT NULL,
template_id UUID NOT NULL,
info VARCHAR(1000),
delivery_methods VARCHAR(255),
@ -60,6 +62,8 @@ CREATE TABLE IF NOT EXISTS notification_request (
stats VARCHAR(10000)
);
CREATE INDEX IF NOT EXISTS idx_notification_request_tenant_id_originator_type_created_time ON notification_request(tenant_id, originator_type, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_request_rule_id_originator_entity_id ON notification_request(rule_id, originator_entity_id);
CREATE INDEX IF NOT EXISTS idx_notification_request_status ON notification_request(status);
CREATE TABLE IF NOT EXISTS notification (
id UUID NOT NULL,
@ -73,9 +77,8 @@ CREATE TABLE IF NOT EXISTS notification (
originator_type VARCHAR(32) NOT NULL,
status VARCHAR(32)
) PARTITION BY RANGE (created_time);
CREATE INDEX IF NOT EXISTS idx_notification_id ON notification(id);
CREATE INDEX IF NOT EXISTS idx_notification_recipient_id_created_time ON notification(recipient_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_notification_request_id ON notification(request_id);
CREATE INDEX IF NOT EXISTS idx_notification_id_recipient_id ON notification(id, recipient_id);
CREATE INDEX IF NOT EXISTS idx_notification_recipient_id_status_created_time ON notification(recipient_id, status, created_time DESC);
ALTER TABLE alarm ADD COLUMN IF NOT EXISTS notification_rule_id UUID;

View File

@ -130,7 +130,7 @@ public class NotificationController extends BaseController {
@RequestParam(required = false) String sortOrder,
@AuthenticationPrincipal SecurityUser user) throws ThingsboardException {
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
return notificationRequestService.findNotificationRequestsByTenantId(user.getTenantId(), pageLink);
return notificationRequestService.findNotificationRequestsByTenantIdAndOriginatorType(user.getTenantId(), NotificationOriginatorType.ADMIN, pageLink);
}
@DeleteMapping("/notification/request/{id}")

View File

@ -70,6 +70,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Service
@ -139,10 +140,9 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
return notificationTargetService.findRecipientsForNotificationTarget(tenantId, ctx.getCustomerId(), targetId, pageLink);
}, 200, recipientsBatch -> {
for (NotificationDeliveryMethod deliveryMethod : savedNotificationRequest.getDeliveryMethods()) {
NotificationChannel notificationChannel = channels.get(deliveryMethod);
log.debug("Sending {} notifications for request {} to recipients batch", deliveryMethod, savedNotificationRequest.getId());
List<User> recipients = recipientsBatch.getData();
log.debug("Sending {} notifications for request {} to recipients batch ({})", deliveryMethod, savedNotificationRequest.getId(), recipients.size());
NotificationChannel notificationChannel = channels.get(deliveryMethod);
for (User recipient : recipients) {
ListenableFuture<Void> resultFuture = processForRecipient(notificationChannel, recipient, ctx);
DonAsynchron.withCallback(resultFuture, result -> {
@ -167,7 +167,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
UserId senderId = notificationRequest.getSenderId();
if (senderId != null) {
if (stats.getErrors().isEmpty()) {
int sent = stats.getSent().values().stream().mapToInt(Set::size).sum();
int sent = stats.getSent().values().stream().mapToInt(AtomicInteger::get).sum();
sendBasicNotification(tenantId, senderId, NotificationType.COMPLETED, "Notifications sent",
"All notifications were successfully sent (" + sent + ")");
} else {

View File

@ -207,7 +207,6 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
Notification notification = update.getNotification();
switch (update.getUpdateType()) {
case CREATED: {
System.err.println("NotificationsCountSubscription CREATED");
subscription.getUnreadCounter().incrementAndGet();
sendUpdate(subscription.getSessionId(), subscription.createUpdate());
break;

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.notification;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.data.Offset;
import org.java_websocket.client.WebSocketClient;
@ -30,9 +31,15 @@ import org.thingsboard.server.common.data.notification.NotificationInfo;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
import org.thingsboard.server.common.data.notification.targets.AllUsersNotificationTargetConfig;
import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.dao.notification.NotificationDao;
import org.thingsboard.server.dao.notification.NotificationRequestDao;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCountUpdate;
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
@ -41,6 +48,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@ -57,6 +65,12 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
@Autowired
private NotificationCenter notificationCenter;
@Autowired
private NotificationDao notificationDao;
@Autowired
private NotificationRequestDao notificationRequestDao;
@Autowired
private DbCallbackExecutorService executor;
@Before
public void beforeEach() throws Exception {
@ -251,7 +265,7 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
@Test
public void testNotificationUpdatesForALotOfUsers() throws Exception {
int usersCount = 100; // FIXME: sometimes if set e.g. to 150, up to 5 WS sessions don't receive update
int usersCount = 200; // FIXME: sometimes if set e.g. to 150, up to 5 WS sessions don't receive update
Map<User, NotificationApiWsClient> sessions = new HashMap<>();
List<NotificationTargetId> targets = new ArrayList<>();
@ -278,7 +292,7 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
sessions.forEach((user, wsClient) -> wsClient.registerWaitForUpdate(2));
NotificationRequest notificationRequest = submitNotificationRequest(targets, "Hello, ${email}", 0,
NotificationDeliveryMethod.PUSH, NotificationDeliveryMethod.EMAIL);
await().atMost(20, TimeUnit.SECONDS)
await().atMost(10, TimeUnit.SECONDS)
.pollDelay(1, TimeUnit.SECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> {
long receivedUpdate = sessions.values().stream()
@ -303,10 +317,8 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
await().atMost(2, TimeUnit.SECONDS)
.until(() -> findNotificationRequest(notificationRequest.getId()).getStats() != null);
NotificationRequestStats stats = findNotificationRequest(notificationRequest.getId()).getStats();
assertThat(stats.getSent().get(NotificationDeliveryMethod.PUSH))
.containsAll(sessions.keySet().stream().map(User::getEmail).collect(Collectors.toSet()));
assertThat(stats.getSent().get(NotificationDeliveryMethod.EMAIL))
.containsAll(sessions.keySet().stream().map(User::getEmail).collect(Collectors.toSet()));
assertThat(stats.getSent().get(NotificationDeliveryMethod.PUSH)).hasValue(usersCount);
assertThat(stats.getSent().get(NotificationDeliveryMethod.EMAIL)).hasValue(usersCount);
sessions.values().forEach(wsClient -> wsClient.registerWaitForUpdate(2));
deleteNotificationRequest(notificationRequest.getId());
@ -335,11 +347,54 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
.until(() -> findNotificationRequest(notificationRequest.getId()).getStats() != null);
NotificationRequestStats stats = findNotificationRequest(notificationRequest.getId()).getStats();
assertThat(stats.getSent().get(NotificationDeliveryMethod.PUSH)).containsOnly(CUSTOMER_USER_EMAIL);
assertThat(stats.getSent().get(NotificationDeliveryMethod.EMAIL)).containsOnly(CUSTOMER_USER_EMAIL);
assertThat(stats.getSent().get(NotificationDeliveryMethod.PUSH)).hasValue(1);
assertThat(stats.getSent().get(NotificationDeliveryMethod.EMAIL)).hasValue(1);
assertThat(stats.getErrors().get(NotificationDeliveryMethod.SMS)).size().isOne();
}
@Test
public void testNotificationsForALotOfUsers() throws Exception {
int usersCount = 7000;
List<User> users = new ArrayList<>();
for (int i = 1; i <= usersCount; i++) {
User user = new User();
user.setTenantId(tenantId);
user.setAuthority(Authority.TENANT_ADMIN);
user.setEmail("test-user-" + i + "@thingsboard.org");
user = doPost("/api/user", user, User.class);
System.err.println(i);
users.add(user);
}
NotificationTarget notificationTarget = new NotificationTarget();
notificationTarget.setTenantId(tenantId);
notificationTarget.setName("All my users");
AllUsersNotificationTargetConfig config = new AllUsersNotificationTargetConfig();
notificationTarget.setConfiguration(config);
notificationTarget = saveNotificationTarget(notificationTarget);
NotificationTargetId notificationTargetId = notificationTarget.getId();
ListenableFuture<NotificationRequest> request = executor.submit(() -> {
return submitNotificationRequest(notificationTargetId, "Hello, ${email}", 0, NotificationDeliveryMethod.PUSH);
});
await().atMost(10, TimeUnit.SECONDS).until(request::isDone);
NotificationRequest notificationRequest = request.get();
await().atMost(5, TimeUnit.SECONDS)
.pollInterval(200, TimeUnit.MILLISECONDS)
.until(() -> {
PageData<Notification> sentNotifications = notificationDao.findByRequestId(tenantId, notificationRequest.getId(), new PageLink(1));
return sentNotifications.getTotalElements() >= usersCount;
});
PageData<Notification> sentNotifications = notificationDao.findByRequestId(tenantId, notificationRequest.getId(), new PageLink(Integer.MAX_VALUE));
assertThat(sentNotifications.getData()).extracting(Notification::getRecipientId)
.containsAll(users.stream().map(User::getId).collect(Collectors.toSet()));
NotificationRequestStats stats = findNotificationRequest(notificationRequest.getId()).getStats();
assertThat(stats.getSent().values().stream().mapToInt(AtomicInteger::get).sum()).isGreaterThanOrEqualTo(usersCount);
}
private void checkFullNotificationsUpdate(UnreadNotificationsUpdate notificationsUpdate, String... expectedNotifications) {
assertThat(notificationsUpdate.getNotifications()).extracting(Notification::getText).containsOnly(expectedNotifications);

View File

@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
@ -33,7 +34,7 @@ public interface NotificationRequestService {
NotificationRequest findNotificationRequestById(TenantId tenantId, NotificationRequestId id);
PageData<NotificationRequest> findNotificationRequestsByTenantId(TenantId tenantId, PageLink pageLink);
PageData<NotificationRequest> findNotificationRequestsByTenantIdAndOriginatorType(TenantId tenantId, NotificationOriginatorType originatorType, PageLink pageLink);
List<NotificationRequestId> findNotificationRequestsIdsByStatusAndRuleId(TenantId tenantId, NotificationRequestStatus requestStatus, NotificationRuleId ruleId);

View File

@ -26,11 +26,12 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class NotificationRequestStats {
private final Map<NotificationDeliveryMethod, Set<String>> sent;
private final Map<NotificationDeliveryMethod, AtomicInteger> sent;
private final Map<NotificationDeliveryMethod, Map<String, String>> errors;
@JsonIgnore
private final Map<NotificationDeliveryMethod, Set<UserId>> processedRecipients;
@ -42,7 +43,7 @@ public class NotificationRequestStats {
}
@JsonCreator
public NotificationRequestStats(@JsonProperty("sent") Map<NotificationDeliveryMethod, Set<String>> sent,
public NotificationRequestStats(@JsonProperty("sent") Map<NotificationDeliveryMethod, AtomicInteger> sent,
@JsonProperty("errors") Map<NotificationDeliveryMethod, Map<String, String>> errors) {
this.sent = sent;
this.errors = errors;
@ -50,7 +51,7 @@ public class NotificationRequestStats {
}
public void reportSent(NotificationDeliveryMethod deliveryMethod, User recipient) {
sent.computeIfAbsent(deliveryMethod, k -> ConcurrentHashMap.newKeySet()).add(recipient.getEmail());
sent.computeIfAbsent(deliveryMethod, k -> new AtomicInteger()).incrementAndGet();
processedRecipients.computeIfAbsent(deliveryMethod, k -> ConcurrentHashMap.newKeySet()).add(recipient.getId());
}

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
@ -52,8 +53,8 @@ public class DefaultNotificationRequestService implements NotificationRequestSer
}
@Override
public PageData<NotificationRequest> findNotificationRequestsByTenantId(TenantId tenantId, PageLink pageLink) {
return notificationRequestDao.findByTenantIdAndPageLink(tenantId, pageLink);
public PageData<NotificationRequest> findNotificationRequestsByTenantIdAndOriginatorType(TenantId tenantId, NotificationOriginatorType originatorType, PageLink pageLink) {
return notificationRequestDao.findByTenantIdAndOriginatorTypeAndPageLink(tenantId, originatorType, pageLink);
}
@Override

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.NotificationTargetId;
import org.thingsboard.server.common.data.id.NotificationTemplateId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
@ -32,7 +33,7 @@ import java.util.List;
public interface NotificationRequestDao extends Dao<NotificationRequest> {
PageData<NotificationRequest> findByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink);
PageData<NotificationRequest> findByTenantIdAndOriginatorTypeAndPageLink(TenantId tenantId, NotificationOriginatorType originatorType, PageLink pageLink);
List<NotificationRequestId> findIdsByRuleId(TenantId tenantId, NotificationRequestStatus requestStatus, NotificationRuleId ruleId);

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.NotificationTargetId;
import org.thingsboard.server.common.data.id.NotificationTemplateId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
@ -51,8 +52,8 @@ public class JpaNotificationRequestDao extends JpaAbstractDao<NotificationReques
private final NotificationRequestRepository notificationRequestRepository;
@Override
public PageData<NotificationRequest> findByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink) {
return DaoUtil.toPageData(notificationRequestRepository.findByTenantId(getId(tenantId, true), DaoUtil.toPageable(pageLink)));
public PageData<NotificationRequest> findByTenantIdAndOriginatorTypeAndPageLink(TenantId tenantId, NotificationOriginatorType originatorType, PageLink pageLink) {
return DaoUtil.toPageData(notificationRequestRepository.findByTenantIdAndOriginatorType(getId(tenantId, true), originatorType, DaoUtil.toPageable(pageLink)));
}
@Override

View File

@ -25,6 +25,7 @@ import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
import org.thingsboard.server.dao.model.sql.NotificationRequestEntity;
@ -34,7 +35,7 @@ import java.util.UUID;
@Repository
public interface NotificationRequestRepository extends JpaRepository<NotificationRequestEntity, UUID> {
Page<NotificationRequestEntity> findByTenantId(UUID tenantId, Pageable pageable);
Page<NotificationRequestEntity> findByTenantIdAndOriginatorType(UUID tenantId, NotificationOriginatorType originatorType, Pageable pageable);
@Query("SELECT r.id FROM NotificationRequestEntity r WHERE r.status = :status AND r.ruleId = :ruleId")
List<UUID> findAllIdsByStatusAndRuleId(@Param("status") NotificationRequestStatus status,

View File

@ -82,10 +82,13 @@ CREATE INDEX IF NOT EXISTS idx_api_usage_state_entity_id ON api_usage_state(enti
CREATE INDEX IF NOT EXISTS idx_notification_target_tenant_id_created_time ON notification_target(tenant_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_template_tenant_id_created_time ON notification_template(tenant_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_rule_tenant_id_created_time ON notification_rule(tenant_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_request_tenant_id_originator_type_created_time ON notification_request(tenant_id, originator_type, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_request_rule_id_originator_entity_id ON notification_request(rule_id, originator_entity_id);
CREATE INDEX IF NOT EXISTS idx_notification_request_status ON notification_request(status);
CREATE INDEX IF NOT EXISTS idx_notification_id ON notification(id);
CREATE INDEX IF NOT EXISTS idx_notification_recipient_id_created_time ON notification(recipient_id, created_time DESC);
CREATE INDEX IF NOT EXISTS idx_notification_notification_request_id ON notification(request_id);
CREATE INDEX IF NOT EXISTS idx_notification_id_recipient_id ON notification(id, recipient_id);
CREATE INDEX IF NOT EXISTS idx_notification_recipient_id_status_created_time ON notification(recipient_id, status, created_time DESC);

View File

@ -812,7 +812,7 @@ CREATE TABLE IF NOT EXISTS notification_request (
id UUID NOT NULL CONSTRAINT notification_request_pkey PRIMARY KEY,
created_time BIGINT NOT NULL,
tenant_id UUID NULL CONSTRAINT fk_notification_request_tenant_id REFERENCES tenant(id) ON DELETE CASCADE,
targets VARCHAR(5000) NOT NULL,
targets VARCHAR(10000) NOT NULL,
template_id UUID NOT NULL,
info VARCHAR(1000),
delivery_methods VARCHAR(255),