Refactoring to implement command unsubscribe for alarm status ws cmd

This commit is contained in:
Andrii Shvaika 2024-11-21 13:45:45 +02:00
parent c498b26485
commit 64581b4fef
16 changed files with 498 additions and 309 deletions

View File

@ -444,75 +444,23 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
}
@Override
public void handleCmd(WebSocketSessionRef sessionRef, AlarmStatusCmd cmd) {
log.debug("[{}] Handling alarm status subscription cmd (cmdId: {})", sessionRef.getSessionId(), cmd.getCmdId());
SecurityUser securityCtx = sessionRef.getSecurityCtx();
TbAlarmStatusSubscription subscription = TbAlarmStatusSubscription.builder()
.serviceId(serviceInfoProvider.getServiceId())
.sessionId(sessionRef.getSessionId())
.subscriptionId(cmd.getCmdId())
.tenantId(securityCtx.getTenantId())
.entityId(cmd.getOriginatorId())
.typeList(cmd.getTypeList())
.severityList(cmd.getSeverityList())
.updateProcessor(this::handleAlarmStatusSubscriptionUpdate)
.build();
localSubscriptionService.addSubscription(subscription, sessionRef);
fetchActiveAlarms(subscription);
sendUpdate(sessionRef.getSessionId(), subscription.createUpdate());
}
private void fetchActiveAlarms(TbAlarmStatusSubscription subscription) {
log.trace("[{}, subId: {}] Fetching active alarms from DB", subscription.getSessionId(), subscription.getSubscriptionId());
OriginatorAlarmFilter originatorAlarmFilter = new OriginatorAlarmFilter(subscription.getEntityId(), subscription.getTypeList(), subscription.getSeverityList());
List<UUID> alarmIds = alarmService.findActiveOriginatorAlarms(subscription.getTenantId(), originatorAlarmFilter, alarmsPerAlarmStatusSubscriptionCacheSize);
subscription.getAlarmIds().addAll(alarmIds);
subscription.setCacheFull(alarmIds.size() == alarmsPerAlarmStatusSubscriptionCacheSize);
}
private void sendUpdate(String sessionId, CmdUpdate update) {
log.trace("[{}, cmdId: {}] Sending WS update: {}", sessionId, update.getCmdId(), update);
wsService.sendUpdate(sessionId, update);
}
private void handleAlarmStatusSubscriptionUpdate(TbSubscription<AlarmSubscriptionUpdate> sub, AlarmSubscriptionUpdate subscriptionUpdate) {
TbAlarmStatusSubscription subscription = (TbAlarmStatusSubscription) sub;
try {
AlarmInfo alarm = subscriptionUpdate.getAlarm();
Set<UUID> alarmsIds = subscription.getAlarmIds();
if (alarmsIds.contains(alarm.getId().getId())) {
if (!subscription.matches(alarm) || subscriptionUpdate.isAlarmDeleted()) {
alarmsIds.remove(alarm.getId().getId());
if (alarmsIds.isEmpty()) {
if (subscription.isCacheFull()) {
fetchActiveAlarms(subscription);
if (alarmsIds.isEmpty()) {
sendUpdate(subscription.getSessionId(), subscription.createUpdate());
}
} else {
sendUpdate(subscription.getSessionId(), subscription.createUpdate());
}
}
}
} else if (subscription.matches(alarm)) {
if (alarmsIds.size() < alarmsPerAlarmStatusSubscriptionCacheSize) {
alarmsIds.add(alarm.getId().getId());
if (alarmsIds.size() == 1) {
sendUpdate(subscription.getSessionId(), subscription.createUpdate());
}
} else {
subscription.setCacheFull(true);
}
}
} catch (Exception e) {
log.error("[{}, subId: {}] Failed to handle update for alarm status subscription: {}", subscription.getSessionId(), subscription.getSubscriptionId(), subscriptionUpdate, e);
public void handleCmd(WebSocketSessionRef session, AlarmStatusCmd cmd) {
log.debug("[{}] Handling alarm status subscription cmd (cmdId: {})", session.getSessionId(), cmd.getCmdId());
TbAlarmStatusSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId());
if (ctx == null) {
ctx = createSubCtx(session, cmd);
long start = System.currentTimeMillis();
ctx.fetchActiveAlarms();
long end = System.currentTimeMillis();
stats.getAlarmQueryInvocationCnt().incrementAndGet();
stats.getAlarmQueryTimeSpent().addAndGet(end - start);
ctx.sendUpdate();
} else {
log.debug("[{}][{}] Received duplicate command: {}", session.getSessionId(), cmd.getCmdId(), cmd);
}
}
private boolean validate(TbAbstractSubCtx<?> finalCtx) {
private boolean validate(TbAbstractSubCtx finalCtx) {
if (finalCtx.isStopped()) {
log.warn("[{}][{}][{}] Received validation task for already stopped context.", finalCtx.getTenantId(), finalCtx.getSessionId(), finalCtx.getCmdId());
return false;
@ -528,7 +476,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
return true;
}
private void refreshDynamicQuery(TbAbstractSubCtx<?> finalCtx) {
private void refreshDynamicQuery(TbAbstractEntityQuerySubCtx<?> finalCtx) {
try {
if (validate(finalCtx)) {
long start = System.currentTimeMillis();
@ -616,6 +564,15 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
return ctx;
}
private TbAlarmStatusSubCtx createSubCtx(WebSocketSessionRef sessionRef, AlarmStatusCmd cmd) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new ConcurrentHashMap<>());
TbAlarmStatusSubCtx ctx = new TbAlarmStatusSubCtx(serviceId, wsService, localSubscriptionService,
stats, alarmService, alarmsPerAlarmStatusSubscriptionCacheSize, sessionRef, cmd.getCmdId());
ctx.createSubscription(cmd);
sessionSubs.put(cmd.getCmdId(), ctx);
return ctx;
}
@SuppressWarnings("unchecked")
private <T extends TbAbstractSubCtx> T getSubCtx(String sessionId, int cmdId) {
Map<Integer, TbAbstractSubCtx> sessionSubs = subscriptionsBySessionId.get(sessionId);

View File

@ -414,7 +414,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
private void onAlarmUpdate(UUID entityId, AlarmSubscriptionUpdate update, TbCallback callback) {
processSubscriptionData(entityId,
sub -> TbSubscriptionType.ALARMS.equals(sub.getType()) || TbSubscriptionType.ALARM_STATUS.equals(sub.getType()),
sub -> TbSubscriptionType.ALARMS.equals(sub.getType()),
update, callback);
}

View File

@ -44,7 +44,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends EntityDataPageLink>> extends TbAbstractSubCtx<T> {
public abstract class TbAbstractDataSubCtx<T extends AbstractDataQuery<? extends EntityDataPageLink>> extends TbAbstractEntityQuerySubCtx<T> {
protected final Map<Integer, EntityId> subToEntityIdMap;
@Getter

View File

@ -0,0 +1,295 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.FilterPredicateType;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.sub.TelemetrySubscriptionUpdate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
@Slf4j
public abstract class TbAbstractEntityQuerySubCtx<T extends EntityCountQuery> extends TbAbstractSubCtx {
protected final EntityService entityService;
protected final AttributesService attributesService;
protected final Set<Integer> subToDynamicValueKeySet;
@Getter
protected final Map<DynamicValueKey, List<DynamicValue>> dynamicValues;
@Getter
@Setter
protected T query;
@Setter
protected volatile ScheduledFuture<?> refreshTask;
public TbAbstractEntityQuerySubCtx(String serviceId, WebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats, WebSocketSessionRef sessionRef, int cmdId) {
super(serviceId, wsService, localSubscriptionService, stats, sessionRef, cmdId);
this.entityService = entityService;
this.attributesService = attributesService;
this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet();
this.dynamicValues = new ConcurrentHashMap<>();
}
public abstract void fetchData();
protected abstract void update();
public void clearSubscriptions() {
clearDynamicValueSubscriptions();
}
public void stop() {
super.stop();
cancelTasks();
clearSubscriptions();
}
public void setAndResolveQuery(T query) {
dynamicValues.clear();
this.query = query;
if (query != null && query.getKeyFilters() != null) {
for (KeyFilter filter : query.getKeyFilters()) {
registerDynamicValues(filter.getPredicate());
}
}
resolve(getTenantId(), getCustomerId(), getUserId());
}
public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) {
List<ListenableFuture<DynamicValueKeySub>> futures = new ArrayList<>();
for (DynamicValueKey key : dynamicValues.keySet()) {
switch (key.getSourceType()) {
case CURRENT_TENANT:
futures.add(resolveEntityValue(tenantId, tenantId, key));
break;
case CURRENT_CUSTOMER:
if (customerId != null && !customerId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, customerId, key));
}
break;
case CURRENT_USER:
if (userId != null && !userId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, userId, key));
}
break;
}
}
try {
Map<EntityId, Map<String, DynamicValueKeySub>> tmpSubMap = new HashMap<>();
for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) {
tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub);
}
for (EntityId entityId : tmpSubMap.keySet()) {
Map<String, Long> keyStates = new HashMap<>();
Map<String, DynamicValueKeySub> dynamicValueKeySubMap = tmpSubMap.get(entityId);
dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs()));
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionRef.getSessionId())
.subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateProcessor((subscription, subscriptionUpdate) -> dynamicValueSubUpdate(subscription.getSessionId(), subscriptionUpdate, dynamicValueKeySubMap))
.queryTs(createdTime)
.allKeys(false)
.keyStates(keyStates)
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
.build();
subToDynamicValueKeySet.add(subIdx);
localSubscriptionService.addSubscription(sub, sessionRef);
}
} catch (InterruptedException | ExecutionException e) {
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
}
}
private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate,
Map<String, DynamicValueKeySub> dynamicValueKeySubMap) {
Map<String, TsValue> latestUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0);
latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
});
boolean invalidateFilter = false;
for (Map.Entry<String, TsValue> entry : latestUpdate.entrySet()) {
String k = entry.getKey();
TsValue tsValue = entry.getValue();
DynamicValueKeySub sub = dynamicValueKeySubMap.get(k);
if (sub.updateValue(tsValue)) {
invalidateFilter = true;
updateDynamicValuesByKey(sub, tsValue);
}
}
if (invalidateFilter) {
update();
}
}
@Data
private static class DynamicValueKeySub {
private final DynamicValueKey key;
private final EntityId entityId;
private long lastUpdateTs;
private String lastUpdateValue;
boolean updateValue(TsValue value) {
if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) {
this.lastUpdateTs = value.getTs();
this.lastUpdateValue = value.getValue();
return true;
} else {
return false;
}
}
}
private ListenableFuture<DynamicValueKeySub> resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) {
ListenableFuture<Optional<AttributeKvEntry>> entry = attributesService.find(tenantId, entityId,
AttributeScope.SERVER_SCOPE, key.getSourceAttribute());
return Futures.transform(entry, attributeOpt -> {
DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId);
if (attributeOpt.isPresent()) {
AttributeKvEntry attribute = attributeOpt.get();
sub.setLastUpdateTs(attribute.getLastUpdateTs());
sub.setLastUpdateValue(attribute.getValueAsString());
updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString()));
}
return sub;
}, MoreExecutors.directExecutor());
}
@SuppressWarnings("unchecked")
protected void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) {
DynamicValueKey dvk = sub.getKey();
switch (dvk.getPredicateType()) {
case STRING:
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue()));
break;
case NUMERIC:
try {
Double dValue = Double.parseDouble(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue));
} catch (NumberFormatException e) {
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null));
}
break;
case BOOLEAN:
Boolean bValue = Boolean.parseBoolean(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue));
break;
}
}
@SuppressWarnings("unchecked")
private void registerDynamicValues(KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
case NUMERIC:
case BOOLEAN:
Optional<DynamicValue> value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate);
if (value.isPresent()) {
DynamicValue dynamicValue = value.get();
DynamicValueKey key = new DynamicValueKey(
predicate.getType(),
dynamicValue.getSourceType(),
dynamicValue.getSourceAttribute());
dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue);
}
break;
case COMPLEX:
((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues);
}
}
private Optional<DynamicValue<T>> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate<T> predicate) {
if (predicate.getValue().getUserValue() == null) {
return Optional.ofNullable(predicate.getValue().getDynamicValue());
} else {
return Optional.empty();
}
}
protected void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId);
}
subToDynamicValueKeySet.clear();
}
}
public void setRefreshTask(ScheduledFuture<?> task) {
if (!stopped) {
this.refreshTask = task;
} else {
task.cancel(true);
}
}
public void cancelTasks() {
if (this.refreshTask != null) {
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId);
this.refreshTask.cancel(true);
}
}
@Data
public static class DynamicValueKey {
@Getter
private final FilterPredicateType predicateType;
@Getter
private final DynamicValueSourceType sourceType;
@Getter
private final String sourceAttribute;
}
}

View File

@ -58,230 +58,37 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Data
public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
public abstract class TbAbstractSubCtx {
@Getter
protected final Lock wsLock = new ReentrantLock(true);
protected final String serviceId;
protected final SubscriptionServiceStatistics stats;
private final WebSocketService wsService;
protected final EntityService entityService;
protected final TbLocalSubscriptionService localSubscriptionService;
protected final AttributesService attributesService;
protected final WebSocketSessionRef sessionRef;
protected final int cmdId;
protected final Set<Integer> subToDynamicValueKeySet;
@Getter
protected final Map<DynamicValueKey, List<DynamicValue>> dynamicValues;
@Getter
@Setter
protected T query;
@Setter
protected volatile ScheduledFuture<?> refreshTask;
protected volatile boolean stopped;
@Getter
protected long createdTime;
public TbAbstractSubCtx(String serviceId, WebSocketService wsService,
EntityService entityService, TbLocalSubscriptionService localSubscriptionService,
AttributesService attributesService, SubscriptionServiceStatistics stats,
TbLocalSubscriptionService localSubscriptionService,
SubscriptionServiceStatistics stats,
WebSocketSessionRef sessionRef, int cmdId) {
this.createdTime = System.currentTimeMillis();
this.serviceId = serviceId;
this.wsService = wsService;
this.entityService = entityService;
this.localSubscriptionService = localSubscriptionService;
this.attributesService = attributesService;
this.stats = stats;
this.sessionRef = sessionRef;
this.cmdId = cmdId;
this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet();
this.dynamicValues = new ConcurrentHashMap<>();
}
public void setAndResolveQuery(T query) {
dynamicValues.clear();
this.query = query;
if (query != null && query.getKeyFilters() != null) {
for (KeyFilter filter : query.getKeyFilters()) {
registerDynamicValues(filter.getPredicate());
}
}
resolve(getTenantId(), getCustomerId(), getUserId());
}
public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) {
List<ListenableFuture<DynamicValueKeySub>> futures = new ArrayList<>();
for (DynamicValueKey key : dynamicValues.keySet()) {
switch (key.getSourceType()) {
case CURRENT_TENANT:
futures.add(resolveEntityValue(tenantId, tenantId, key));
break;
case CURRENT_CUSTOMER:
if (customerId != null && !customerId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, customerId, key));
}
break;
case CURRENT_USER:
if (userId != null && !userId.isNullUid()) {
futures.add(resolveEntityValue(tenantId, userId, key));
}
break;
}
}
try {
Map<EntityId, Map<String, DynamicValueKeySub>> tmpSubMap = new HashMap<>();
for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) {
tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub);
}
for (EntityId entityId : tmpSubMap.keySet()) {
Map<String, Long> keyStates = new HashMap<>();
Map<String, DynamicValueKeySub> dynamicValueKeySubMap = tmpSubMap.get(entityId);
dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs()));
int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet();
TbAttributeSubscription sub = TbAttributeSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionRef.getSessionId())
.subscriptionId(subIdx)
.tenantId(sessionRef.getSecurityCtx().getTenantId())
.entityId(entityId)
.updateProcessor((subscription, subscriptionUpdate) -> dynamicValueSubUpdate(subscription.getSessionId(), subscriptionUpdate, dynamicValueKeySubMap))
.queryTs(createdTime)
.allKeys(false)
.keyStates(keyStates)
.scope(TbAttributeSubscriptionScope.SERVER_SCOPE)
.build();
subToDynamicValueKeySet.add(subIdx);
localSubscriptionService.addSubscription(sub, sessionRef);
}
} catch (InterruptedException | ExecutionException e) {
log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet());
}
}
private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate,
Map<String, DynamicValueKeySub> dynamicValueKeySubMap) {
Map<String, TsValue> latestUpdate = new HashMap<>();
subscriptionUpdate.getData().forEach((k, v) -> {
Object[] data = (Object[]) v.get(0);
latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1]));
});
boolean invalidateFilter = false;
for (Map.Entry<String, TsValue> entry : latestUpdate.entrySet()) {
String k = entry.getKey();
TsValue tsValue = entry.getValue();
DynamicValueKeySub sub = dynamicValueKeySubMap.get(k);
if (sub.updateValue(tsValue)) {
invalidateFilter = true;
updateDynamicValuesByKey(sub, tsValue);
}
}
if (invalidateFilter) {
update();
}
}
public abstract boolean isDynamic();
public abstract void fetchData();
protected abstract void update();
public void clearSubscriptions() {
clearDynamicValueSubscriptions();
}
public void stop() {
stopped = true;
cancelTasks();
clearSubscriptions();
}
@Data
private static class DynamicValueKeySub {
private final DynamicValueKey key;
private final EntityId entityId;
private long lastUpdateTs;
private String lastUpdateValue;
boolean updateValue(TsValue value) {
if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) {
this.lastUpdateTs = value.getTs();
this.lastUpdateValue = value.getValue();
return true;
} else {
return false;
}
}
}
private ListenableFuture<DynamicValueKeySub> resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) {
ListenableFuture<Optional<AttributeKvEntry>> entry = attributesService.find(tenantId, entityId,
AttributeScope.SERVER_SCOPE, key.getSourceAttribute());
return Futures.transform(entry, attributeOpt -> {
DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId);
if (attributeOpt.isPresent()) {
AttributeKvEntry attribute = attributeOpt.get();
sub.setLastUpdateTs(attribute.getLastUpdateTs());
sub.setLastUpdateValue(attribute.getValueAsString());
updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString()));
}
return sub;
}, MoreExecutors.directExecutor());
}
@SuppressWarnings("unchecked")
protected void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) {
DynamicValueKey dvk = sub.getKey();
switch (dvk.getPredicateType()) {
case STRING:
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue()));
break;
case NUMERIC:
try {
Double dValue = Double.parseDouble(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue));
} catch (NumberFormatException e) {
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null));
}
break;
case BOOLEAN:
Boolean bValue = Boolean.parseBoolean(tsValue.getValue());
dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue));
break;
}
}
@SuppressWarnings("unchecked")
private void registerDynamicValues(KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
case NUMERIC:
case BOOLEAN:
Optional<DynamicValue> value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate);
if (value.isPresent()) {
DynamicValue dynamicValue = value.get();
DynamicValueKey key = new DynamicValueKey(
predicate.getType(),
dynamicValue.getSourceType(),
dynamicValue.getSourceAttribute());
dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue);
}
break;
case COMPLEX:
((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues);
}
}
private Optional<DynamicValue<T>> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate<T> predicate) {
if (predicate.getValue().getUserValue() == null) {
return Optional.ofNullable(predicate.getValue().getDynamicValue());
} else {
return Optional.empty();
}
}
public String getSessionId() {
@ -300,40 +107,6 @@ public abstract class TbAbstractSubCtx<T extends EntityCountQuery> {
return sessionRef.getSecurityCtx().getId();
}
protected void clearDynamicValueSubscriptions() {
if (subToDynamicValueKeySet != null) {
for (Integer subId : subToDynamicValueKeySet) {
localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subId);
}
subToDynamicValueKeySet.clear();
}
}
public void setRefreshTask(ScheduledFuture<?> task) {
if (!stopped) {
this.refreshTask = task;
} else {
task.cancel(true);
}
}
public void cancelTasks() {
if (this.refreshTask != null) {
log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId);
this.refreshTask.cancel(true);
}
}
@Data
public static class DynamicValueKey {
@Getter
private final FilterPredicateType predicateType;
@Getter
private final DynamicValueSourceType sourceType;
@Getter
private final String sourceAttribute;
}
public void sendWsMsg(CmdUpdate update) {
wsLock.lock();
try {

View File

@ -29,7 +29,7 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
@Slf4j
@ToString(callSuper = true)
public class TbAlarmCountSubCtx extends TbAbstractSubCtx<AlarmCountQuery> {
public class TbAlarmCountSubCtx extends TbAbstractEntityQuerySubCtx<AlarmCountQuery> {
private final AlarmService alarmService;

View File

@ -93,6 +93,11 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
this.alarmsMap = new HashMap<>();
}
@Override
public void clearSubscriptions() {
super.clearSubscriptions();
}
public void fetchAlarms() {
alarmInvocationAttempts++;
log.trace("[{}] Fetching alarms: {}", cmdId, alarmInvocationAttempts);

View File

@ -0,0 +1,129 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.subscription;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.OriginatorAlarmFilter;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmCountUpdate;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AlarmStatusCmd;
import org.thingsboard.server.service.ws.telemetry.sub.AlarmSubscriptionUpdate;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@Slf4j
@ToString(callSuper = true)
public class TbAlarmStatusSubCtx extends TbAbstractSubCtx {
private final AlarmService alarmService;
private final int alarmsPerAlarmStatusSubscriptionCacheSize;
private volatile TbAlarmStatusSubscription subscription;
public TbAlarmStatusSubCtx(String serviceId, WebSocketService wsService,
TbLocalSubscriptionService localSubscriptionService,
SubscriptionServiceStatistics stats, AlarmService alarmService,
int alarmsPerAlarmStatusSubscriptionCacheSize,
WebSocketSessionRef sessionRef, int cmdId) {
super(serviceId, wsService, localSubscriptionService, stats, sessionRef, cmdId);
this.alarmService = alarmService;
this.alarmsPerAlarmStatusSubscriptionCacheSize = alarmsPerAlarmStatusSubscriptionCacheSize;
}
@Override
public boolean isDynamic() {
return false;
}
@Override
public void stop() {
super.stop();
localSubscriptionService.cancelSubscription(getTenantId(), sessionRef.getSessionId(), subscription.getSubscriptionId());
}
public void createSubscription(AlarmStatusCmd cmd) {
SecurityUser securityCtx = sessionRef.getSecurityCtx();
subscription = TbAlarmStatusSubscription.builder()
.serviceId(serviceId)
.sessionId(sessionRef.getSessionId())
.subscriptionId(sessionRef.getSessionSubIdSeq().incrementAndGet())
.tenantId(securityCtx.getTenantId())
.entityId(cmd.getOriginatorId())
.typeList(cmd.getTypeList())
.severityList(cmd.getSeverityList())
.updateProcessor(this::handleAlarmStatusSubscriptionUpdate)
.build();
localSubscriptionService.addSubscription(subscription, sessionRef);
}
public void sendUpdate() {
sendWsMsg(subscription.createUpdate());
}
public void fetchActiveAlarms() {
log.trace("[{}, subId: {}] Fetching active alarms from DB", subscription.getSessionId(), subscription.getSubscriptionId());
OriginatorAlarmFilter originatorAlarmFilter = new OriginatorAlarmFilter(subscription.getEntityId(), subscription.getTypeList(), subscription.getSeverityList());
List<UUID> alarmIds = alarmService.findActiveOriginatorAlarms(subscription.getTenantId(), originatorAlarmFilter, alarmsPerAlarmStatusSubscriptionCacheSize);
subscription.getAlarmIds().addAll(alarmIds);
subscription.setHasMoreAlarmsInDB(alarmIds.size() == alarmsPerAlarmStatusSubscriptionCacheSize);
}
private void handleAlarmStatusSubscriptionUpdate(TbSubscription<AlarmSubscriptionUpdate> sub, AlarmSubscriptionUpdate subscriptionUpdate) {
try {
AlarmInfo alarm = subscriptionUpdate.getAlarm();
Set<UUID> alarmsIds = subscription.getAlarmIds();
if (alarmsIds.contains(alarm.getId().getId())) {
if (!subscription.matches(alarm) || subscriptionUpdate.isAlarmDeleted()) {
alarmsIds.remove(alarm.getId().getId());
if (alarmsIds.isEmpty()) {
if (subscription.isHasMoreAlarmsInDB()) {
fetchActiveAlarms();
if (alarmsIds.isEmpty()) {
sendUpdate();
}
} else {
sendUpdate();
}
}
}
} else if (subscription.matches(alarm)) {
if (alarmsIds.size() < alarmsPerAlarmStatusSubscriptionCacheSize) {
alarmsIds.add(alarm.getId().getId());
if (alarmsIds.size() == 1) {
sendUpdate();
}
} else {
subscription.setHasMoreAlarmsInDB(true);
}
}
} catch (Exception e) {
log.error("[{}, subId: {}] Failed to handle update for alarm status subscription: {}", subscription.getSessionId(), subscription.getSubscriptionId(), subscriptionUpdate, e);
}
}
}

View File

@ -38,7 +38,7 @@ public class TbAlarmStatusSubscription extends TbSubscription<AlarmSubscriptionU
private final Set<UUID> alarmIds = new HashSet<>();
@Getter
@Setter
private boolean cacheFull;
private boolean hasMoreAlarmsInDB;
@Getter
private final List<String> typeList;
@Getter
@ -48,7 +48,7 @@ public class TbAlarmStatusSubscription extends TbSubscription<AlarmSubscriptionU
public TbAlarmStatusSubscription(String serviceId, String sessionId, int subscriptionId, TenantId tenantId, EntityId entityId,
BiConsumer<TbSubscription<AlarmSubscriptionUpdate>, AlarmSubscriptionUpdate> updateProcessor,
List<String> typeList, List<AlarmSeverity> severityList) {
super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ALARM_STATUS, updateProcessor);
super(serviceId, sessionId, subscriptionId, tenantId, entityId, TbSubscriptionType.ALARMS, updateProcessor);
this.typeList = typeList;
this.severityList = severityList;
}
@ -56,7 +56,7 @@ public class TbAlarmStatusSubscription extends TbSubscription<AlarmSubscriptionU
public AlarmStatusUpdate createUpdate() {
return AlarmStatusUpdate.builder()
.cmdId(getSubscriptionId())
.active(alarmIds.size() > 0)
.active(!alarmIds.isEmpty())
.build();
}

View File

@ -24,7 +24,7 @@ import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.EntityCountUpdate;
@Slf4j
public class TbEntityCountSubCtx extends TbAbstractSubCtx<EntityCountQuery> {
public class TbEntityCountSubCtx extends TbAbstractEntityQuerySubCtx<EntityCountQuery> {
private volatile int result;

View File

@ -74,7 +74,6 @@ public class TbEntityLocalSubsInfo {
stateChanged = true;
}
break;
case ALARM_STATUS:
case ALARMS:
if (!newState.alarms) {
newState.alarms = true;
@ -169,7 +168,6 @@ public class TbEntityLocalSubsInfo {
case NOTIFICATIONS_COUNT:
state.notifications = false;
break;
case ALARM_STATUS:
case ALARMS:
state.alarms = false;
break;

View File

@ -16,5 +16,5 @@
package org.thingsboard.server.service.subscription;
public enum TbSubscriptionType {
TIMESERIES, ATTRIBUTES, ALARMS, ALARM_STATUS, NOTIFICATIONS, NOTIFICATIONS_COUNT
TIMESERIES, ATTRIBUTES, ALARMS, NOTIFICATIONS, NOTIFICATIONS_COUNT
}

View File

@ -174,6 +174,7 @@ public class DefaultWebSocketService implements WebSocketService {
cmdsHandlers.put(WsCmdType.ALARM_DATA_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
cmdsHandlers.put(WsCmdType.ENTITY_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
cmdsHandlers.put(WsCmdType.ALARM_COUNT_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
cmdsHandlers.put(WsCmdType.ALARM_STATUS_UNSUBSCRIBE, newCmdHandler(this::handleWsDataUnsubscribeCmd));
cmdsHandlers.put(WsCmdType.NOTIFICATIONS, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsSubCmd));
cmdsHandlers.put(WsCmdType.NOTIFICATIONS_COUNT, newCmdHandler(notificationCmdsHandler::handleUnreadNotificationsCountSubCmd));
cmdsHandlers.put(WsCmdType.MARK_NOTIFICATIONS_AS_READ, newCmdHandler(notificationCmdsHandler::handleMarkAsReadCmd));

View File

@ -25,6 +25,7 @@ public enum WsCmdType {
ENTITY_COUNT,
ALARM_DATA,
ALARM_COUNT,
ALARM_STATUS,
NOTIFICATIONS,
NOTIFICATIONS_COUNT,
@ -36,5 +37,5 @@ public enum WsCmdType {
ENTITY_DATA_UNSUBSCRIBE,
ENTITY_COUNT_UNSUBSCRIBE,
NOTIFICATIONS_UNSUBSCRIBE,
ALARM_STATUS
ALARM_STATUS_UNSUBSCRIBE
}

View File

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.ws.telemetry.cmd.v2;
import lombok.Data;
import org.thingsboard.server.service.ws.WsCmdType;
@Data
public class AlarmStatusUnsubscribeCmd implements UnsubscribeCmd {
private final int cmdId;
@Override
public WsCmdType getType() {
return WsCmdType.ALARM_STATUS_UNSUBSCRIBE;
}
}

View File

@ -428,7 +428,7 @@ public class WebsocketApiTest extends AbstractControllerTest {
String alarmId = alarms.get(i).getId().getId().toString();
doPost("/api/alarm/" + alarmId + "/clear", Alarm.class);
}
AlarmStatusUpdate alarmStatusUpdate = JacksonUtil.fromString(getWsClient().waitForUpdate(), AlarmStatusUpdate.class);
AlarmStatusUpdate alarmStatusUpdate = JacksonUtil.fromString(getWsClient().waitForUpdate(TimeUnit.SECONDS.toMillis(5)), AlarmStatusUpdate.class);
Assert.assertNull(alarmStatusUpdate);
//clear 6-th alarm should send update