More tests for notifications
This commit is contained in:
parent
165e86c82b
commit
6e8dba6e2c
@ -87,7 +87,7 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
|
||||
notificationRequest.setTenantId(tenantId);
|
||||
if (notificationRequest.getAdditionalConfig() != null) {
|
||||
NotificationRequestConfig config = notificationRequest.getAdditionalConfig();
|
||||
if (config.getSendingDelayInMinutes() > 0 && notificationRequest.getId() == null) {
|
||||
if (config.getSendingDelayInSec() > 0 && notificationRequest.getId() == null) {
|
||||
notificationRequest.setStatus(NotificationRequestStatus.SCHEDULED);
|
||||
NotificationRequest savedNotificationRequest = notificationRequestService.saveNotificationRequest(tenantId, notificationRequest);
|
||||
forwardToNotificationSchedulerService(tenantId, savedNotificationRequest.getId(), false);
|
||||
|
||||
@ -97,7 +97,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
|
||||
}
|
||||
if (notificationRule.getEscalationConfig() != null) {
|
||||
for (NonConfirmedNotificationEscalation escalation : notificationRule.getEscalationConfig().getEscalations()) {
|
||||
submitNotificationRequest(tenantId, escalation.getNotificationTargetId(), notificationRule, alarm, escalation.getDelayInMinutes());
|
||||
submitNotificationRequest(tenantId, escalation.getNotificationTargetId(), notificationRule, alarm, escalation.getDelayInSec());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -116,10 +116,10 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
|
||||
return alarm.getStatus().isAck() && alarm.getStatus().isCleared();
|
||||
}
|
||||
|
||||
private void submitNotificationRequest(TenantId tenantId, NotificationTargetId targetId, NotificationRule notificationRule, Alarm alarm, int delayInMinutes) {
|
||||
private void submitNotificationRequest(TenantId tenantId, NotificationTargetId targetId, NotificationRule notificationRule, Alarm alarm, int delayInSec) {
|
||||
NotificationRequestConfig config = new NotificationRequestConfig();
|
||||
if (delayInMinutes > 0) {
|
||||
config.setSendingDelayInMinutes(delayInMinutes);
|
||||
if (delayInSec > 0) {
|
||||
config.setSendingDelayInSec(delayInSec);
|
||||
}
|
||||
NotificationInfo notificationInfo = constructNotificationInfo(alarm, notificationRule);
|
||||
|
||||
|
||||
@ -83,12 +83,16 @@ public class DefaultNotificationSchedulerService extends AbstractPartitionBasedS
|
||||
}
|
||||
|
||||
private void scheduleNotificationRequest(TenantId tenantId, NotificationRequest request, long requestTs) {
|
||||
int delayInMinutes = Optional.ofNullable(request)
|
||||
int delayInSec = Optional.ofNullable(request)
|
||||
.map(NotificationRequest::getAdditionalConfig)
|
||||
.map(NotificationRequestConfig::getSendingDelayInMinutes)
|
||||
.map(NotificationRequestConfig::getSendingDelayInSec)
|
||||
.orElse(0);
|
||||
if (delayInMinutes <= 0) return; // todo: think about: if server was down for some time and delayMs will be negative - need to send these requests as well (but when the value is within some range)
|
||||
long delayMs = TimeUnit.MINUTES.toMillis(delayInMinutes) - (System.currentTimeMillis() - requestTs);
|
||||
if (delayInSec <= 0) return;
|
||||
long delayInMs = TimeUnit.SECONDS.toMillis(delayInSec) - (System.currentTimeMillis() - requestTs);
|
||||
if (delayInMs < 0) { // in case the scheduled request processing time was during the downtime
|
||||
delayInMs = 0;
|
||||
// or maybe no need to process outdated notification requests ?
|
||||
}
|
||||
|
||||
ListenableScheduledFuture<?> scheduledTask = scheduledExecutor.schedule(() -> {
|
||||
NotificationRequest notificationRequest = notificationRequestService.findNotificationRequestById(tenantId, request.getId());
|
||||
@ -96,7 +100,7 @@ public class DefaultNotificationSchedulerService extends AbstractPartitionBasedS
|
||||
|
||||
notificationManager.processNotificationRequest(tenantId, notificationRequest);
|
||||
scheduledNotificationRequests.remove(notificationRequest.getId());
|
||||
}, delayMs, TimeUnit.MILLISECONDS);
|
||||
}, delayInMs, TimeUnit.MILLISECONDS);
|
||||
scheduledNotificationRequests.put(request.getId(), scheduledTask);
|
||||
}
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
import org.java_websocket.handshake.ServerHandshake;
|
||||
@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
|
||||
@Slf4j
|
||||
public class TbTestWebSocketClient extends WebSocketClient {
|
||||
|
||||
@Getter
|
||||
private volatile String lastMsg;
|
||||
private volatile CountDownLatch reply;
|
||||
private volatile CountDownLatch update;
|
||||
@ -107,10 +109,18 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
||||
}
|
||||
|
||||
public String waitForUpdate() {
|
||||
return waitForUpdate(TimeUnit.SECONDS.toMillis(3));
|
||||
return waitForUpdate(false);
|
||||
}
|
||||
|
||||
public String waitForUpdate(boolean throwExceptionOnTimeout) {
|
||||
return waitForUpdate(TimeUnit.SECONDS.toMillis(3), throwExceptionOnTimeout);
|
||||
}
|
||||
|
||||
public String waitForUpdate(long ms) {
|
||||
return waitForUpdate(ms, false);
|
||||
}
|
||||
|
||||
public String waitForUpdate(long ms, boolean throwExceptionOnTimeout) {
|
||||
try {
|
||||
if (update.await(ms, TimeUnit.MILLISECONDS)) {
|
||||
return lastMsg;
|
||||
@ -118,10 +128,18 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Failed to await reply", e);
|
||||
}
|
||||
return null;
|
||||
if (throwExceptionOnTimeout) {
|
||||
throw new AssertionError("Waited for update for " + ms + " ms but none arrived");
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String waitForReply() {
|
||||
return waitForReply(false);
|
||||
}
|
||||
|
||||
public String waitForReply(boolean throwExceptionOnTimeout) {
|
||||
try {
|
||||
if (reply.await(3, TimeUnit.SECONDS)) {
|
||||
return lastMsg;
|
||||
@ -129,7 +147,11 @@ public class TbTestWebSocketClient extends WebSocketClient {
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Failed to await reply", e);
|
||||
}
|
||||
return null;
|
||||
if (throwExceptionOnTimeout) {
|
||||
throw new AssertionError("Waited for reply for 3 seconds but none arrived");
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public EntityDataUpdate parseDataReply(String msg) {
|
||||
|
||||
@ -15,14 +15,28 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.notification;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import org.assertj.core.data.Offset;
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.thingsboard.rule.engine.api.NotificationManager;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.id.NotificationRequestId;
|
||||
import org.thingsboard.server.common.data.id.NotificationTargetId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.notification.Notification;
|
||||
import org.thingsboard.server.common.data.notification.NotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.NotificationRequest;
|
||||
import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
|
||||
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
|
||||
import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
|
||||
import org.thingsboard.server.common.data.notification.targets.SingleUserNotificationTargetConfig;
|
||||
import org.thingsboard.server.common.data.notification.targets.UserListNotificationTargetConfig;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.security.Authority;
|
||||
import org.thingsboard.server.controller.AbstractControllerTest;
|
||||
import org.thingsboard.server.controller.TbTestWebSocketClient;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
@ -30,13 +44,22 @@ import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsCou
|
||||
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@DaoSqlTest
|
||||
public class NotificationApiTest extends AbstractControllerTest {
|
||||
|
||||
@Autowired
|
||||
private NotificationManager notificationManager;
|
||||
|
||||
@Before
|
||||
public void beforeEach() throws Exception {
|
||||
loginTenantAdmin();
|
||||
@ -46,12 +69,12 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
public void testSubscribingToUnreadNotificationsCount() {
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText1 = "Notification 1";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText1);
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText1);
|
||||
String notificationText2 = "Notification 2";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText2);
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText2);
|
||||
|
||||
getWsClient().subscribeForUnreadNotificationsCount();
|
||||
getWsClient().waitForReply();
|
||||
getWsClient().waitForReply(true);
|
||||
|
||||
UnreadNotificationsCountUpdate update = getWsClient().getLastCountUpdate();
|
||||
assertThat(update.getTotalUnreadCount()).isEqualTo(2);
|
||||
@ -61,17 +84,17 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
public void testReceivingCountUpdates_multipleSessions() {
|
||||
getWsClient().subscribeForUnreadNotificationsCount();
|
||||
getAnotherWsClient().subscribeForUnreadNotificationsCount();
|
||||
getWsClient().waitForReply();
|
||||
getAnotherWsClient().waitForReply();
|
||||
getWsClient().waitForReply(true);
|
||||
getAnotherWsClient().waitForReply(true);
|
||||
assertThat(getWsClient().getLastCountUpdate().getTotalUnreadCount()).isZero();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
getAnotherWsClient().registerWaitForUpdate();
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText = "Notification";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText);
|
||||
getWsClient().waitForUpdate();
|
||||
getAnotherWsClient().waitForUpdate();
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText);
|
||||
getWsClient().waitForUpdate(true);
|
||||
getAnotherWsClient().waitForUpdate(true);
|
||||
|
||||
assertThat(getWsClient().getLastCountUpdate().getTotalUnreadCount()).isOne();
|
||||
assertThat(getAnotherWsClient().getLastCountUpdate().getTotalUnreadCount()).isOne();
|
||||
@ -81,14 +104,14 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
public void testSubscribingToUnreadNotifications_multipleSessions() throws Exception {
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText1 = "Notification 1";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText1);
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText1);
|
||||
String notificationText2 = "Notification 2";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText2);
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText2);
|
||||
|
||||
getWsClient().subscribeForUnreadNotifications(10);
|
||||
getAnotherWsClient().subscribeForUnreadNotifications(10);
|
||||
getWsClient().waitForReply();
|
||||
getAnotherWsClient().waitForReply();
|
||||
getWsClient().waitForReply(true);
|
||||
getAnotherWsClient().waitForReply(true);
|
||||
|
||||
checkFullNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText1, notificationText2);
|
||||
checkFullNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText1, notificationText2);
|
||||
@ -98,8 +121,8 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
public void testReceivingNotificationUpdates_multipleSessions() {
|
||||
getWsClient().subscribeForUnreadNotifications(10);
|
||||
getAnotherWsClient().subscribeForUnreadNotifications(10);
|
||||
getWsClient().waitForReply();
|
||||
getAnotherWsClient().waitForReply();
|
||||
getWsClient().waitForReply(true);
|
||||
getAnotherWsClient().waitForReply(true);
|
||||
UnreadNotificationsUpdate notificationsUpdate = getWsClient().getLastDataUpdate();
|
||||
assertThat(notificationsUpdate.getTotalUnreadCount()).isZero();
|
||||
|
||||
@ -107,9 +130,9 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
getAnotherWsClient().registerWaitForUpdate();
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText = "Notification 1";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText);
|
||||
assertThat(getWsClient().waitForUpdate()).isNotNull();
|
||||
assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText);
|
||||
getWsClient().waitForUpdate(true);
|
||||
getAnotherWsClient().waitForUpdate(true);
|
||||
|
||||
checkPartialNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText, 1);
|
||||
checkPartialNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText, 1);
|
||||
@ -119,26 +142,26 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
public void testMarkingAsRead_multipleSessions() {
|
||||
getWsClient().subscribeForUnreadNotifications(10);
|
||||
getAnotherWsClient().subscribeForUnreadNotifications(10);
|
||||
getWsClient().waitForReply();
|
||||
getAnotherWsClient().waitForReply();
|
||||
getWsClient().waitForReply(true);
|
||||
getAnotherWsClient().waitForReply(true);
|
||||
getAnotherWsClient().subscribeForUnreadNotificationsCount();
|
||||
getAnotherWsClient().waitForReply();
|
||||
getAnotherWsClient().waitForReply(true);
|
||||
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
getWsClient().registerWaitForUpdate();
|
||||
getAnotherWsClient().registerWaitForUpdate(2);
|
||||
String notificationText1 = "Notification 1";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText1);
|
||||
assertThat(getWsClient().waitForUpdate()).isNotNull();
|
||||
assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText1);
|
||||
getWsClient().waitForUpdate(true);
|
||||
getAnotherWsClient().waitForUpdate(true);
|
||||
Notification notification1 = getWsClient().getLastDataUpdate().getUpdate();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
getAnotherWsClient().registerWaitForUpdate(2);
|
||||
String notificationText2 = "Notification 2";
|
||||
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText2);
|
||||
assertThat(getWsClient().waitForUpdate()).isNotNull();
|
||||
assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
|
||||
submitNotificationRequest(notificationTarget.getId(), notificationText2);
|
||||
getWsClient().waitForUpdate(true);
|
||||
getAnotherWsClient().waitForUpdate(true);
|
||||
assertThat(getWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2);
|
||||
assertThat(getAnotherWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2);
|
||||
assertThat(getAnotherWsClient().getLastCountUpdate().getTotalUnreadCount()).isEqualTo(2);
|
||||
@ -146,14 +169,139 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
getWsClient().registerWaitForUpdate();
|
||||
getAnotherWsClient().registerWaitForUpdate(2);
|
||||
getWsClient().markNotificationAsRead(notification1.getUuidId());
|
||||
assertThat(getWsClient().waitForUpdate()).isNotNull();
|
||||
assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
|
||||
getWsClient().waitForUpdate(true);
|
||||
getAnotherWsClient().waitForUpdate(true);
|
||||
|
||||
checkFullNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText2);
|
||||
checkFullNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText2);
|
||||
assertThat(getAnotherWsClient().getLastCountUpdate().getTotalUnreadCount()).isOne();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelayedNotificationRequest() throws Exception {
|
||||
getWsClient().subscribeForUnreadNotifications(5);
|
||||
getWsClient().waitForReply(true);
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText = "Was scheduled for 5 sec";
|
||||
NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), notificationText, 5);
|
||||
assertThat(notificationRequest.getStatus()).isEqualTo(NotificationRequestStatus.SCHEDULED);
|
||||
await().atLeast(4, TimeUnit.SECONDS)
|
||||
.atMost(6, TimeUnit.SECONDS)
|
||||
.until(() -> getWsClient().getLastMsg() != null);
|
||||
|
||||
Notification delayedNotification = getWsClient().getLastDataUpdate().getUpdate();
|
||||
assertThat(delayedNotification).extracting(Notification::getText).isEqualTo(notificationText);
|
||||
assertThat(delayedNotification.getCreatedTime() - notificationRequest.getCreatedTime())
|
||||
.isCloseTo(TimeUnit.SECONDS.toMillis(5), Offset.offset(500L));
|
||||
assertThat(findNotificationRequest(notificationRequest.getId()).getStatus()).isEqualTo(NotificationRequestStatus.PROCESSED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenNotificationRequestIsDeleted_thenDeleteNotifications() throws Exception {
|
||||
getWsClient().subscribeForUnreadNotifications(10);
|
||||
getWsClient().waitForReply(true);
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), "Test");
|
||||
getWsClient().waitForUpdate(true);
|
||||
assertThat(getWsClient().getNotifications()).singleElement().extracting(Notification::getRequestId)
|
||||
.isEqualTo(notificationRequest.getId());
|
||||
assertThat(getWsClient().getUnreadCount()).isOne();
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
deleteNotificationRequest(notificationRequest.getId());
|
||||
getWsClient().waitForUpdate(true);
|
||||
|
||||
assertThat(getWsClient().getNotifications()).isEmpty();
|
||||
assertThat(getWsClient().getUnreadCount()).isZero();
|
||||
assertThat(getMyNotifications(false, 10)).size().isZero();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void whenNotificationRequestIsUpdated_thenUpdateNotifications() throws Exception {
|
||||
getWsClient().subscribeForUnreadNotifications(10);
|
||||
getWsClient().waitForReply(true);
|
||||
|
||||
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
|
||||
String notificationText = "Text";
|
||||
getWsClient().registerWaitForUpdate();
|
||||
NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), notificationText);
|
||||
getWsClient().waitForUpdate(true);
|
||||
Notification initialNotification = getWsClient().getLastDataUpdate().getUpdate();
|
||||
assertThat(getMyNotifications(false, 10)).singleElement().isEqualTo(initialNotification);
|
||||
assertThat(initialNotification.getInfo()).isNotNull().isEqualTo(notificationRequest.getNotificationInfo());
|
||||
|
||||
getWsClient().registerWaitForUpdate();
|
||||
NotificationInfo newNotificationInfo = new NotificationInfo();
|
||||
newNotificationInfo.setDescription("New description");
|
||||
notificationRequest.setNotificationInfo(newNotificationInfo);
|
||||
notificationManager.updateNotificationRequest(tenantId, notificationRequest);
|
||||
getWsClient().waitForUpdate(true);
|
||||
Notification updatedNotification = getWsClient().getLastDataUpdate().getUpdate();
|
||||
assertThat(updatedNotification.getInfo()).isEqualTo(newNotificationInfo);
|
||||
assertThat(getMyNotifications(false, 10)).singleElement().isEqualTo(updatedNotification);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotificationUpdatesForUsersInTarget() throws Exception {
|
||||
Map<User, NotificationApiWsClient> wsSessions = createUsersAndSetUpWsSessions(150);
|
||||
wsSessions.forEach((user, wsClient) -> {
|
||||
wsClient.subscribeForUnreadNotifications(10);
|
||||
wsClient.waitForReply(true);
|
||||
wsClient.subscribeForUnreadNotificationsCount();
|
||||
wsClient.waitForReply(true);
|
||||
});
|
||||
|
||||
NotificationTarget notificationTarget = new NotificationTarget();
|
||||
UserListNotificationTargetConfig config = new UserListNotificationTargetConfig();
|
||||
config.setUsersIds(wsSessions.keySet().stream().map(User::getUuidId).collect(Collectors.toList()));
|
||||
notificationTarget.setName("150 users");
|
||||
notificationTarget.setTenantId(tenantId);
|
||||
notificationTarget.setConfiguration(config);
|
||||
notificationTarget = saveNotificationTarget(notificationTarget);
|
||||
|
||||
wsSessions.forEach((user, wsClient) -> wsClient.registerWaitForUpdate(2));
|
||||
NotificationRequest notificationRequest = submitNotificationRequest(notificationTarget.getId(), "Hello, ${recipientEmail}");
|
||||
await().atMost(3, TimeUnit.SECONDS)
|
||||
.pollDelay(1, TimeUnit.SECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
|
||||
.until(() -> wsSessions.values().stream()
|
||||
.allMatch(wsClient -> wsClient.getLastDataUpdate() != null
|
||||
&& wsClient.getLastCountUpdate() != null));
|
||||
|
||||
wsSessions.forEach((user, wsClient) -> {
|
||||
assertThat(wsClient.getLastDataUpdate().getTotalUnreadCount()).isOne();
|
||||
assertThat(wsClient.getLastCountUpdate().getTotalUnreadCount()).isOne();
|
||||
|
||||
Notification notification = wsClient.getLastDataUpdate().getUpdate();
|
||||
assertThat(notification.getRecipientId()).isEqualTo(user.getId());
|
||||
assertThat(notification.getRequestId()).isEqualTo(notificationRequest.getId());
|
||||
assertThat(notification.getText()).isEqualTo("Hello, " + user.getEmail());
|
||||
});
|
||||
wsSessions.values().forEach(WebSocketClient::close);
|
||||
}
|
||||
|
||||
private Map<User, NotificationApiWsClient> createUsersAndSetUpWsSessions(int count) throws Exception {
|
||||
List<User> users = new LinkedList<>();
|
||||
for (int i = 1; i <= count; i++) {
|
||||
User user = new User();
|
||||
user.setTenantId(tenantId);
|
||||
user.setAuthority(Authority.TENANT_ADMIN);
|
||||
user.setEmail("test-user-" + i + "@thingsboard.org");
|
||||
users.add(createUser(user, "12345678"));
|
||||
}
|
||||
Map<User, NotificationApiWsClient> wsSessions = new HashMap<>();
|
||||
for (User user : users) {
|
||||
login(user.getEmail(), "12345678");
|
||||
NotificationApiWsClient wsClient = (NotificationApiWsClient) buildAndConnectWebSocketClient();
|
||||
wsSessions.put(user, wsClient);
|
||||
}
|
||||
loginTenantAdmin();
|
||||
return wsSessions;
|
||||
}
|
||||
|
||||
|
||||
private void checkFullNotificationsUpdate(UnreadNotificationsUpdate notificationsUpdate, String... expectedNotifications) {
|
||||
assertThat(notificationsUpdate.getNotifications()).extracting(Notification::getText).containsOnly(expectedNotifications);
|
||||
@ -172,19 +320,46 @@ public class NotificationApiTest extends AbstractControllerTest {
|
||||
SingleUserNotificationTargetConfig config = new SingleUserNotificationTargetConfig();
|
||||
config.setUserId(userId.getId());
|
||||
notificationTarget.setConfiguration(config);
|
||||
return saveNotificationTarget(notificationTarget);
|
||||
}
|
||||
|
||||
private NotificationTarget saveNotificationTarget(NotificationTarget notificationTarget) {
|
||||
return doPost("/api/notification/target", notificationTarget, NotificationTarget.class);
|
||||
}
|
||||
|
||||
private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String notificationReason, String text) {
|
||||
private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text) {
|
||||
return submitNotificationRequest(targetId, text, 0);
|
||||
}
|
||||
|
||||
private NotificationRequest submitNotificationRequest(NotificationTargetId targetId, String text, int delayInSec) {
|
||||
NotificationRequestConfig config = new NotificationRequestConfig();
|
||||
config.setSendingDelayInSec(delayInSec);
|
||||
NotificationInfo notificationInfo = new NotificationInfo();
|
||||
notificationInfo.setDescription("The text: " + text);
|
||||
NotificationRequest notificationRequest = NotificationRequest.builder()
|
||||
.tenantId(tenantId)
|
||||
.targetId(targetId)
|
||||
.notificationReason(notificationReason)
|
||||
.notificationReason("Test")
|
||||
.textTemplate(text)
|
||||
.notificationInfo(notificationInfo)
|
||||
.additionalConfig(config)
|
||||
.build();
|
||||
return doPost("/api/notification/request", notificationRequest, NotificationRequest.class);
|
||||
}
|
||||
|
||||
private NotificationRequest findNotificationRequest(NotificationRequestId id) throws Exception {
|
||||
return doGet("/api/notification/request/" + id, NotificationRequest.class);
|
||||
}
|
||||
|
||||
private void deleteNotificationRequest(NotificationRequestId id) throws Exception {
|
||||
doDelete("/api/notification/request/" + id);
|
||||
}
|
||||
|
||||
private List<Notification> getMyNotifications(boolean unreadOnly, int limit) throws Exception {
|
||||
return doGetTypedWithPageLink("/api/notifications?unreadOnly={unreadOnly}&", new TypeReference<PageData<Notification>>() {},
|
||||
new PageLink(limit, 0), unreadOnly).getData();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TbTestWebSocketClient buildAndConnectWebSocketClient() throws URISyntaxException, InterruptedException {
|
||||
NotificationApiWsClient wsClient = new NotificationApiWsClient(WS_URL + wsPort, token);
|
||||
|
||||
@ -76,6 +76,13 @@ public class NotificationApiWsClient extends TbTestWebSocketClient {
|
||||
send(cmd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerWaitForUpdate(int count) {
|
||||
lastDataUpdate = null;
|
||||
lastCountUpdate = null;
|
||||
super.registerWaitForUpdate(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String s) {
|
||||
JsonNode update = JacksonUtil.toJsonNode(s);
|
||||
|
||||
@ -0,0 +1,86 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.notification;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.thingsboard.rest.client.RestClient;
|
||||
import org.thingsboard.server.common.data.notification.AlarmOriginatedNotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.Notification;
|
||||
import org.thingsboard.server.common.data.notification.NotificationInfo;
|
||||
import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Scanner;
|
||||
|
||||
public class NotificationsClient extends NotificationApiWsClient {
|
||||
|
||||
private NotificationsClient(String wsUrl, String token) throws Exception {
|
||||
super(wsUrl, token);
|
||||
}
|
||||
|
||||
public static NotificationsClient newInstance(String username, String password) throws Exception {
|
||||
RestClient restClient = new RestClient("http://localhost:8080");
|
||||
restClient.login(username, password);
|
||||
NotificationsClient client = new NotificationsClient("ws://localhost:8080", restClient.getToken());
|
||||
client.connectBlocking();
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(String s) {
|
||||
super.onMessage(s);
|
||||
// printNotificationsCount();
|
||||
printNotifications();
|
||||
}
|
||||
|
||||
public void printNotifications() {
|
||||
System.out.println(StringUtils.repeat(System.lineSeparator(), 20));
|
||||
List<Notification> notifications = getNotifications();
|
||||
System.out.printf(" %s NEW MESSAGE%s\n\n", getUnreadCount(), notifications.size() > 1 ? "S" : "");
|
||||
notifications.forEach(notification -> {
|
||||
String notificationInfoStr = "";
|
||||
if (notification.getOriginatorType() == NotificationOriginatorType.ALARM) {
|
||||
AlarmOriginatedNotificationInfo info = (AlarmOriginatedNotificationInfo) notification.getInfo();
|
||||
notificationInfoStr = String.format("Alarm of type %s - %s severity - status: %s",
|
||||
info.getAlarmType(), info.getAlarmSeverity(), info.getAlarmStatus());
|
||||
} else if (notification.getInfo() != null) {
|
||||
notificationInfoStr = Strings.nullToEmpty(notification.getInfo().getDescription());
|
||||
}
|
||||
SimpleDateFormat format = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
|
||||
String time = format.format(new Date(notification.getCreatedTime()));
|
||||
System.out.printf("[%s] %-19s | %-30s | (%s)\n", time, notification.getReason(), notification.getText(), notificationInfoStr);
|
||||
});
|
||||
System.out.println(StringUtils.repeat(System.lineSeparator(), 5));
|
||||
}
|
||||
|
||||
public void printNotificationsCount() {
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
System.out.println();
|
||||
int unreadCount = getUnreadCount();
|
||||
System.out.printf("\r\r%s NEW MESSAGE%s", unreadCount, unreadCount > 1 ? "S" : "");
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
NotificationsClient client = NotificationsClient.newInstance("tenant@thingsboard.org", "tenant");
|
||||
client.subscribeForUnreadNotifications(5);
|
||||
// client.subscribeForUnreadNotificationsCount();
|
||||
new Scanner(System.in).nextLine();
|
||||
}
|
||||
}
|
||||
@ -19,5 +19,5 @@ import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class NotificationRequestConfig {
|
||||
private int sendingDelayInMinutes;
|
||||
private int sendingDelayInSec;
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ import javax.validation.constraints.NotNull;
|
||||
public class NonConfirmedNotificationEscalation {
|
||||
|
||||
@Min(1)
|
||||
private int delayInMinutes; // delay since initial notification request // if no one from previous escalation item has read the notification, send notifications after this time to other recipients
|
||||
private int delayInSec;
|
||||
@NotNull
|
||||
private NotificationTargetId notificationTargetId;
|
||||
|
||||
|
||||
@ -784,7 +784,7 @@ CREATE TABLE IF NOT EXISTS notification_target (
|
||||
created_time BIGINT NOT NULL,
|
||||
tenant_id UUID NOT NULL,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
configuration varchar(1000) NOT NULL
|
||||
configuration VARCHAR NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS notification_rule (
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user