diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index c43999aaf3..f1f9c8f0c8 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -251,7 +251,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc ctx.cancelTasks(); ctx.clearEntitySubscriptions(); if (entities.isEmpty()) { - AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, false); + AlarmDataUpdate update = new AlarmDataUpdate(cmd.getCmdId(), new PageData<>(), null, 0, 0); wsService.sendWsMsg(ctx.getSessionId(), update); } else { ctx.fetchAlarms(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index d9824a4655..d6a7abc063 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -88,7 +88,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { public void fetchAlarms() { AlarmDataUpdate update; - if(!entitiesMap.isEmpty()) { + if (!entitiesMap.isEmpty()) { long start = System.currentTimeMillis(); PageData alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), query, getOrderedEntityIds()); @@ -96,9 +96,9 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { stats.getAlarmQueryInvocationCnt().incrementAndGet(); stats.getAlarmQueryTimeSpent().addAndGet(end - start); alarms = setAndMergeAlarmsData(alarms); - update = new AlarmDataUpdate(cmdId, alarms, null, tooManyEntities); + update = new AlarmDataUpdate(cmdId, alarms, null, maxEntitiesPerAlarmSubscription, data.getTotalElements()); } else { - update = new AlarmDataUpdate(cmdId, new PageData<>(), null, false); + update = new AlarmDataUpdate(cmdId, new PageData<>(), null, maxEntitiesPerAlarmSubscription, data.getTotalElements()); } wsService.sendWsMsg(getSessionId(), update); } @@ -178,7 +178,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { alarm.getLatest().computeIfAbsent(keyType, tmp -> new HashMap<>()).putAll(latestUpdate); return alarm; }).collect(Collectors.toList()); - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, tooManyEntities)); + wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, update, maxEntitiesPerAlarmSubscription, data.getTotalElements())); } else { log.trace("[{}][{}][{}][{}] Received stale subscription update: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), keyType, subscriptionUpdate); } @@ -201,7 +201,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { AlarmData updated = new AlarmData(alarm, current.getOriginatorName(), current.getEntityId()); updated.getLatest().putAll(current.getLatest()); alarmsMap.put(alarmId, updated); - wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), tooManyEntities)); + wsService.sendWsMsg(sessionId, new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements())); } else { fetchAlarms(); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 00540ce02b..16e6bca911 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -108,6 +108,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; + private static final String FAILED_TO_PARSE_WS_COMMAND = "Failed to parse websocket command!"; private final ConcurrentMap wsSessionsMap = new ConcurrentHashMap<>(); @@ -224,8 +225,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } catch (IOException e) { log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); - TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.INTERNAL_ERROR, SESSION_META_DATA_NOT_FOUND); - sendWsMsg(sessionRef, update); + sendWsMsg(sessionRef, new TelemetrySubscriptionUpdate(UNKNOWN_SUBSCRIPTION_ID, SubscriptionErrorCode.BAD_REQUEST, FAILED_TO_PARSE_WS_COMMAND)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/AlarmDataUpdate.java b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/AlarmDataUpdate.java index e928068d35..7a55d4bf75 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/AlarmDataUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/AlarmDataUpdate.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.telemetry.cmd.v2; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; import lombok.NoArgsConstructor; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmData; @@ -27,11 +28,15 @@ import java.util.List; public class AlarmDataUpdate extends DataUpdate { - private boolean tooManyEntities; + @Getter + private long allowedEntities; + @Getter + private long totalEntities; - public AlarmDataUpdate(int cmdId, PageData data, List update, boolean tooManyEntities) { + public AlarmDataUpdate(int cmdId, PageData data, List update, long allowedEntities, long totalEntities) { super(cmdId, data, update, SubscriptionErrorCode.NO_ERROR.getCode(), null); - this.tooManyEntities = tooManyEntities; + this.allowedEntities = allowedEntities; + this.totalEntities = totalEntities; } public AlarmDataUpdate(int cmdId, int errorCode, String errorMsg) { @@ -49,8 +54,10 @@ public class AlarmDataUpdate extends DataUpdate { @JsonProperty("update") List update, @JsonProperty("errorCode") int errorCode, @JsonProperty("errorMsg") String errorMsg, - @JsonProperty("tooManyEntities") boolean tooManyEntities) { + @JsonProperty("allowedEntities") long allowedEntities, + @JsonProperty("totalEntities") long totalEntities) { super(cmdId, data, update, errorCode, errorMsg); - this.tooManyEntities = tooManyEntities; + this.allowedEntities = allowedEntities; + this.totalEntities = totalEntities; } }