Alarm asignee updates
This commit is contained in:
parent
33ecbb238b
commit
8b6950031e
@ -26,7 +26,6 @@ import org.thingsboard.common.util.JacksonUtil;
|
|||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.rpc.RpcError;
|
import org.thingsboard.server.common.data.rpc.RpcError;
|
||||||
@ -36,6 +35,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
|||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||||
import org.thingsboard.server.common.stats.StatsFactory;
|
import org.thingsboard.server.common.stats.StatsFactory;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
|
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
|
||||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
@ -503,7 +503,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
|||||||
subscriptionManagerService.onAlarmUpdate(
|
subscriptionManagerService.onAlarmUpdate(
|
||||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||||
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
|
||||||
JacksonUtil.fromString(proto.getAlarm(), Alarm.class),callback);
|
JacksonUtil.fromString(proto.getAlarm(), Alarm.class),
|
||||||
|
JacksonUtil.fromString(proto.getAssignee(), AlarmAssigneeUpdate.class),
|
||||||
|
callback);
|
||||||
} else if (msg.hasAlarmDelete()) {
|
} else if (msg.hasAlarmDelete()) {
|
||||||
TbAlarmDeleteProto proto = msg.getAlarmDelete();
|
TbAlarmDeleteProto proto = msg.getAlarmDelete();
|
||||||
subscriptionManagerService.onAlarmDeleted(
|
subscriptionManagerService.onAlarmDeleted(
|
||||||
|
|||||||
@ -26,7 +26,6 @@ import org.thingsboard.server.cluster.TbClusterService;
|
|||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
@ -41,6 +40,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
|||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
|
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
|
||||||
@ -269,7 +269,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
updateDeviceInactivityTimeout(tenantId, entityId, attributes);
|
updateDeviceInactivityTimeout(tenantId, entityId, attributes);
|
||||||
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||||
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
|
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
|
||||||
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
|
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
|
||||||
, null);
|
, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -293,7 +293,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback) {
|
public void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, AlarmAssigneeUpdate assignee, TbCallback callback) {
|
||||||
onLocalAlarmSubUpdate(entityId,
|
onLocalAlarmSubUpdate(entityId,
|
||||||
s -> {
|
s -> {
|
||||||
if (TbSubscriptionType.ALARMS.equals(s.getType())) {
|
if (TbSubscriptionType.ALARMS.equals(s.getType())) {
|
||||||
@ -303,14 +303,13 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
s -> alarm.getCreatedTime() >= s.getTs() || alarm.getAssignTs() >= s.getTs(),
|
s -> alarm.getCreatedTime() >= s.getTs() || alarm.getAssignTs() >= s.getTs(),
|
||||||
s -> alarm,
|
alarm, assignee, false
|
||||||
false
|
|
||||||
);
|
);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarmInfo, TbCallback callback) {
|
public void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback) {
|
||||||
onLocalAlarmSubUpdate(entityId,
|
onLocalAlarmSubUpdate(entityId,
|
||||||
s -> {
|
s -> {
|
||||||
if (TbSubscriptionType.ALARMS.equals(s.getType())) {
|
if (TbSubscriptionType.ALARMS.equals(s.getType())) {
|
||||||
@ -319,9 +318,8 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
s -> alarmInfo.getCreatedTime() >= s.getTs(),
|
s -> alarm.getCreatedTime() >= s.getTs(),
|
||||||
s -> alarmInfo,
|
alarm, null, true
|
||||||
true
|
|
||||||
);
|
);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
}
|
}
|
||||||
@ -355,7 +353,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
deleteDeviceInactivityTimeout(tenantId, entityId, keys);
|
||||||
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||||
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId,
|
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(tenantId,
|
||||||
new DeviceId(entityId.getId()), scope, keys), null);
|
new DeviceId(entityId.getId()), scope, keys), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
@ -415,20 +413,21 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
private void onLocalAlarmSubUpdate(EntityId entityId,
|
private void onLocalAlarmSubUpdate(EntityId entityId,
|
||||||
Function<TbSubscription, TbAlarmsSubscription> castFunction,
|
Function<TbSubscription, TbAlarmsSubscription> castFunction,
|
||||||
Predicate<TbAlarmsSubscription> filterFunction,
|
Predicate<TbAlarmsSubscription> filterFunction,
|
||||||
Function<TbAlarmsSubscription, Alarm> processFunction,
|
Alarm alarm, AlarmAssigneeUpdate assignee,
|
||||||
boolean deleted) {
|
boolean deleted) {
|
||||||
Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
|
Set<TbSubscription> entitySubscriptions = subscriptionsByEntityId.get(entityId);
|
||||||
|
if (alarm == null) {
|
||||||
|
log.warn("[{}] empty alarm update!", entityId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (entitySubscriptions != null) {
|
if (entitySubscriptions != null) {
|
||||||
entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
|
entitySubscriptions.stream().map(castFunction).filter(Objects::nonNull).filter(filterFunction).forEach(s -> {
|
||||||
Alarm alarm = processFunction.apply(s);
|
if (serviceId.equals(s.getServiceId())) {
|
||||||
if (alarm != null) {
|
AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate(s.getSubscriptionId(), alarm, assignee, deleted);
|
||||||
if (serviceId.equals(s.getServiceId())) {
|
localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
|
||||||
AlarmSubscriptionUpdate update = new AlarmSubscriptionUpdate(s.getSubscriptionId(), alarm, deleted);
|
} else {
|
||||||
localSubscriptionService.onSubscriptionUpdate(s.getSessionId(), update, TbCallback.EMPTY);
|
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
|
||||||
} else {
|
toCoreNotificationsProducer.send(tpi, toProto(s, alarm, assignee, deleted), null);
|
||||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, s.getServiceId());
|
|
||||||
toCoreNotificationsProducer.send(tpi, toProto(s, alarm, deleted), null);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
@ -559,22 +558,25 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
|||||||
});
|
});
|
||||||
|
|
||||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
|
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
|
||||||
LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build())
|
LocalSubscriptionServiceMsgProto.newBuilder().setSubUpdate(builder.build()).build())
|
||||||
.build();
|
.build();
|
||||||
return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
|
return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, Alarm alarm, boolean deleted) {
|
private TbProtoQueueMsg<ToCoreNotificationMsg> toProto(TbSubscription subscription, Alarm alarm, AlarmAssigneeUpdate assignee, boolean deleted) {
|
||||||
TbAlarmSubscriptionUpdateProto.Builder builder = TbAlarmSubscriptionUpdateProto.newBuilder();
|
TbAlarmSubscriptionUpdateProto.Builder builder = TbAlarmSubscriptionUpdateProto.newBuilder();
|
||||||
|
|
||||||
builder.setSessionId(subscription.getSessionId());
|
builder.setSessionId(subscription.getSessionId());
|
||||||
builder.setSubscriptionId(subscription.getSubscriptionId());
|
builder.setSubscriptionId(subscription.getSubscriptionId());
|
||||||
builder.setAlarm(JacksonUtil.toString(alarm));
|
builder.setAlarm(JacksonUtil.toString(alarm));
|
||||||
|
if (assignee != null) {
|
||||||
|
builder.setAssignee(JacksonUtil.toString(assignee));
|
||||||
|
}
|
||||||
builder.setDeleted(deleted);
|
builder.setDeleted(deleted);
|
||||||
|
|
||||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
|
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToLocalSubscriptionServiceMsg(
|
||||||
LocalSubscriptionServiceMsgProto.newBuilder()
|
LocalSubscriptionServiceMsgProto.newBuilder()
|
||||||
.setAlarmSubUpdate(builder.build()).build())
|
.setAlarmSubUpdate(builder.build()).build())
|
||||||
.build();
|
.build();
|
||||||
return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
|
return new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,12 +17,12 @@ package org.thingsboard.server.service.subscription;
|
|||||||
|
|
||||||
import org.springframework.context.ApplicationListener;
|
import org.springframework.context.ApplicationListener;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -43,7 +43,7 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
|
|||||||
|
|
||||||
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
|
||||||
|
|
||||||
void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
|
void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, AlarmAssigneeUpdate assignee, TbCallback callback);
|
||||||
|
|
||||||
void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
|
void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
|
||||||
|
|
||||||
|
|||||||
@ -224,7 +224,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx<AlarmDataQuery> {
|
|||||||
boolean matchesFilter = filter(alarm);
|
boolean matchesFilter = filter(alarm);
|
||||||
if (onCurrentPage) {
|
if (onCurrentPage) {
|
||||||
if (matchesFilter) {
|
if (matchesFilter) {
|
||||||
AlarmData updated = current.update(alarm);
|
AlarmData updated = current.update(alarm, subscriptionUpdate.getAssignee());
|
||||||
updated.getLatest().putAll(current.getLatest());
|
updated.getLatest().putAll(current.getLatest());
|
||||||
alarmsMap.put(alarmId, updated);
|
alarmsMap.put(alarmId, updated);
|
||||||
sendWsMsg(new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements()));
|
sendWsMsg(new AlarmDataUpdate(cmdId, null, Collections.singletonList(updated), maxEntitiesPerAlarmSubscription, data.getTotalElements()));
|
||||||
|
|||||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription;
|
|||||||
|
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||||
@ -191,7 +192,8 @@ public class TbSubscriptionUtils {
|
|||||||
return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
|
return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg());
|
||||||
} else {
|
} else {
|
||||||
Alarm alarm = JacksonUtil.fromString(proto.getAlarm(), Alarm.class);
|
Alarm alarm = JacksonUtil.fromString(proto.getAlarm(), Alarm.class);
|
||||||
return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), alarm);
|
AlarmAssigneeUpdate assigneeUpdate = JacksonUtil.fromString(proto.getAlarm(), AlarmAssigneeUpdate.class);
|
||||||
|
return new AlarmSubscriptionUpdate(proto.getSubscriptionId(), alarm, assigneeUpdate, proto.getDeleted());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -317,7 +319,7 @@ public class TbSubscriptionUtils {
|
|||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, Alarm alarm) {
|
public static ToCoreMsg toAlarmUpdateProto(TenantId tenantId, EntityId entityId, AlarmAssigneeUpdate assigneeUpdate, Alarm alarm) {
|
||||||
TbAlarmUpdateProto.Builder builder = TbAlarmUpdateProto.newBuilder();
|
TbAlarmUpdateProto.Builder builder = TbAlarmUpdateProto.newBuilder();
|
||||||
builder.setEntityType(entityId.getEntityType().name());
|
builder.setEntityType(entityId.getEntityType().name());
|
||||||
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
|
||||||
@ -325,6 +327,7 @@ public class TbSubscriptionUtils {
|
|||||||
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||||
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
|
||||||
builder.setAlarm(JacksonUtil.toString(alarm));
|
builder.setAlarm(JacksonUtil.toString(alarm));
|
||||||
|
builder.setAssignee(JacksonUtil.toString(assigneeUpdate));
|
||||||
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
|
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
|
||||||
msgBuilder.setAlarmUpdate(builder);
|
msgBuilder.setAlarmUpdate(builder);
|
||||||
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
|
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
|
||||||
|
|||||||
@ -211,12 +211,12 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
|
|||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
|
||||||
if (currentPartitions.contains(tpi)) {
|
if (currentPartitions.contains(tpi)) {
|
||||||
if (subscriptionManagerService.isPresent()) {
|
if (subscriptionManagerService.isPresent()) {
|
||||||
subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm, TbCallback.EMPTY);
|
subscriptionManagerService.get().onAlarmUpdate(tenantId, entityId, alarm, result.getAssigneeUpdate(), TbCallback.EMPTY);
|
||||||
} else {
|
} else {
|
||||||
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, alarm);
|
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, result.getAssigneeUpdate(), alarm);
|
||||||
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
|
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,16 +17,7 @@ package org.thingsboard.server.service.telemetry.sub;
|
|||||||
|
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
|
||||||
import org.thingsboard.server.common.data.query.AlarmData;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class AlarmSubscriptionUpdate {
|
public class AlarmSubscriptionUpdate {
|
||||||
|
|
||||||
@ -39,16 +30,19 @@ public class AlarmSubscriptionUpdate {
|
|||||||
@Getter
|
@Getter
|
||||||
private Alarm alarm;
|
private Alarm alarm;
|
||||||
@Getter
|
@Getter
|
||||||
|
private AlarmAssigneeUpdate assignee;
|
||||||
|
@Getter
|
||||||
private boolean alarmDeleted;
|
private boolean alarmDeleted;
|
||||||
|
|
||||||
public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm) {
|
public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm) {
|
||||||
this(subscriptionId, alarm, false);
|
this(subscriptionId, alarm, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm, boolean alarmDeleted) {
|
public AlarmSubscriptionUpdate(int subscriptionId, Alarm alarm, AlarmAssigneeUpdate assignee, boolean alarmDeleted) {
|
||||||
super();
|
super();
|
||||||
this.subscriptionId = subscriptionId;
|
this.subscriptionId = subscriptionId;
|
||||||
this.alarm = alarm;
|
this.alarm = alarm;
|
||||||
|
this.assignee = assignee;
|
||||||
this.alarmDeleted = alarmDeleted;
|
this.alarmDeleted = alarmDeleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -562,6 +562,7 @@ message TbAlarmSubscriptionUpdateProto {
|
|||||||
string errorMsg = 4;
|
string errorMsg = 4;
|
||||||
string alarm = 5;
|
string alarm = 5;
|
||||||
bool deleted = 6;
|
bool deleted = 6;
|
||||||
|
string assignee = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TbAttributeUpdateProto {
|
message TbAttributeUpdateProto {
|
||||||
@ -581,6 +582,7 @@ message TbAlarmUpdateProto {
|
|||||||
int64 tenantIdMSB = 4;
|
int64 tenantIdMSB = 4;
|
||||||
int64 tenantIdLSB = 5;
|
int64 tenantIdLSB = 5;
|
||||||
string alarm = 6;
|
string alarm = 6;
|
||||||
|
string assignee = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message TbAlarmDeleteProto {
|
message TbAlarmDeleteProto {
|
||||||
|
|||||||
@ -18,8 +18,8 @@ package org.thingsboard.server.dao.alarm;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -34,12 +34,18 @@ public class AlarmOperationResult {
|
|||||||
private final AlarmSeverity oldSeverity;
|
private final AlarmSeverity oldSeverity;
|
||||||
private final List<EntityId> propagatedEntitiesList;
|
private final List<EntityId> propagatedEntitiesList;
|
||||||
|
|
||||||
|
private final AlarmAssigneeUpdate assigneeUpdate;
|
||||||
|
|
||||||
|
public AlarmOperationResult(Alarm alarm, AlarmAssigneeUpdate assigneeUpdate, List<EntityId> propagatedEntitiesList) {
|
||||||
|
this(alarm, true, false, null, propagatedEntitiesList, assigneeUpdate);
|
||||||
|
}
|
||||||
|
|
||||||
public AlarmOperationResult(Alarm alarm, boolean successful) {
|
public AlarmOperationResult(Alarm alarm, boolean successful) {
|
||||||
this(alarm, successful, Collections.emptyList());
|
this(alarm, successful, Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public AlarmOperationResult(Alarm alarm, boolean successful, List<EntityId> propagatedEntitiesList) {
|
public AlarmOperationResult(Alarm alarm, boolean successful, List<EntityId> propagatedEntitiesList) {
|
||||||
this(alarm, successful, false, null, propagatedEntitiesList);
|
this(alarm, successful, false, null, propagatedEntitiesList, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AlarmOperationResult(Alarm alarm, boolean successful, boolean created, List<EntityId> propagatedEntitiesList) {
|
public AlarmOperationResult(Alarm alarm, boolean successful, boolean created, List<EntityId> propagatedEntitiesList) {
|
||||||
@ -48,5 +54,6 @@ public class AlarmOperationResult {
|
|||||||
this.created = created;
|
this.created = created;
|
||||||
this.propagatedEntitiesList = propagatedEntitiesList;
|
this.propagatedEntitiesList = propagatedEntitiesList;
|
||||||
this.oldSeverity = null;
|
this.oldSeverity = null;
|
||||||
|
this.assigneeUpdate = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,34 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.alarm;
|
||||||
|
|
||||||
|
import lombok.Builder;
|
||||||
|
import lombok.Data;
|
||||||
|
import org.thingsboard.server.common.data.id.UserId;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class AlarmAssignee implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 6628286223963972860L;
|
||||||
|
|
||||||
|
private final UserId id;
|
||||||
|
private final String firstName;
|
||||||
|
private final String lastName;
|
||||||
|
private final String email;
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,30 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2023 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.common.data.alarm;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class AlarmAssigneeUpdate implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -2391676304697483808L;
|
||||||
|
|
||||||
|
private final boolean deleted;
|
||||||
|
private final AlarmAssignee assignee;
|
||||||
|
|
||||||
|
}
|
||||||
@ -18,12 +18,13 @@ package org.thingsboard.server.common.data.query;
|
|||||||
import lombok.EqualsAndHashCode;
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssignee;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
@EqualsAndHashCode(callSuper = true)
|
@EqualsAndHashCode(callSuper = true)
|
||||||
public class AlarmData extends AlarmInfo {
|
public class AlarmData extends AlarmInfo {
|
||||||
@ -35,7 +36,7 @@ public class AlarmData extends AlarmInfo {
|
|||||||
@Getter
|
@Getter
|
||||||
private final Map<EntityKeyType, Map<String, TsValue>> latest;
|
private final Map<EntityKeyType, Map<String, TsValue>> latest;
|
||||||
|
|
||||||
public AlarmData update(Alarm alarm) {
|
public AlarmData update(Alarm alarm, AlarmAssigneeUpdate assigneeUpdate) {
|
||||||
this.setEndTs(alarm.getEndTs());
|
this.setEndTs(alarm.getEndTs());
|
||||||
this.setSeverity(alarm.getSeverity());
|
this.setSeverity(alarm.getSeverity());
|
||||||
this.setStatus(alarm.getStatus());
|
this.setStatus(alarm.getStatus());
|
||||||
@ -47,6 +48,21 @@ public class AlarmData extends AlarmInfo {
|
|||||||
// This should be changed via separate message?
|
// This should be changed via separate message?
|
||||||
this.setAckTs(alarm.getAckTs());
|
this.setAckTs(alarm.getAckTs());
|
||||||
this.setClearTs(alarm.getClearTs());
|
this.setClearTs(alarm.getClearTs());
|
||||||
|
|
||||||
|
if (assigneeUpdate != null) {
|
||||||
|
if (assigneeUpdate.isDeleted()) {
|
||||||
|
this.setAssigneeId(null);
|
||||||
|
this.setAssigneeFirstName(null);
|
||||||
|
this.setAssigneeLastName(null);
|
||||||
|
this.setAssigneeEmail(null);
|
||||||
|
} else {
|
||||||
|
AlarmAssignee assignee = assigneeUpdate.getAssignee();
|
||||||
|
this.setAssigneeId(assignee.getId());
|
||||||
|
this.setAssigneeFirstName(assignee.getFirstName());
|
||||||
|
this.setAssigneeLastName(assignee.getLastName());
|
||||||
|
this.setAssigneeEmail(assignee.getEmail());
|
||||||
|
}
|
||||||
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -30,6 +30,8 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
|||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.User;
|
import org.thingsboard.server.common.data.User;
|
||||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssignee;
|
||||||
|
import org.thingsboard.server.common.data.alarm.AlarmAssigneeUpdate;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
import org.thingsboard.server.common.data.alarm.AlarmInfo;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmQuery;
|
import org.thingsboard.server.common.data.alarm.AlarmQuery;
|
||||||
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
|
import org.thingsboard.server.common.data.alarm.AlarmSearchStatus;
|
||||||
@ -228,12 +230,12 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
|||||||
} else {
|
} else {
|
||||||
propagatedEntitiesList = new ArrayList<>(getPropagationEntityIds(result));
|
propagatedEntitiesList = new ArrayList<>(getPropagationEntityIds(result));
|
||||||
}
|
}
|
||||||
return new AlarmOperationResult(result, true, false, oldAlarmSeverity, propagatedEntitiesList);
|
return new AlarmOperationResult(result, true, false, oldAlarmSeverity, propagatedEntitiesList, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<AlarmOperationResult> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTime) {
|
public ListenableFuture<AlarmOperationResult> ackAlarm(TenantId tenantId, AlarmId alarmId, long ackTime) {
|
||||||
return getAndUpdateAsync(tenantId, alarmId, new Function<Alarm, AlarmOperationResult>() {
|
return getAndUpdateAsync(tenantId, alarmId, new Function<>() {
|
||||||
@Nullable
|
@Nullable
|
||||||
@Override
|
@Override
|
||||||
public AlarmOperationResult apply(@Nullable Alarm alarm) {
|
public AlarmOperationResult apply(@Nullable Alarm alarm) {
|
||||||
@ -286,7 +288,11 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
|||||||
alarm.setAssigneeId(assigneeId);
|
alarm.setAssigneeId(assigneeId);
|
||||||
alarm.setAssignTs(assignTime);
|
alarm.setAssignTs(assignTime);
|
||||||
alarm = alarmDao.save(alarm.getTenantId(), alarm);
|
alarm = alarmDao.save(alarm.getTenantId(), alarm);
|
||||||
return new AlarmOperationResult(alarm, true, new ArrayList<>(getPropagationEntityIds(alarm)));
|
AlarmInfo alarmInfo = getAlarmInfo(tenantId, alarm);
|
||||||
|
return new AlarmOperationResult(alarm, new AlarmAssigneeUpdate(false,
|
||||||
|
new AlarmAssignee(alarmInfo.getAssigneeId(), alarmInfo.getAssigneeFirstName(),
|
||||||
|
alarmInfo.getAssigneeLastName(), alarmInfo.getAssigneeEmail())
|
||||||
|
), new ArrayList<>(getPropagationEntityIds(alarm)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -304,7 +310,8 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
|||||||
alarm.setAssigneeId(null);
|
alarm.setAssigneeId(null);
|
||||||
alarm.setAssignTs(assignTime);
|
alarm.setAssignTs(assignTime);
|
||||||
alarm = alarmDao.save(alarm.getTenantId(), alarm);
|
alarm = alarmDao.save(alarm.getTenantId(), alarm);
|
||||||
return new AlarmOperationResult(alarm, true, new ArrayList<>(getPropagationEntityIds(alarm)));
|
return new AlarmOperationResult(alarm, new AlarmAssigneeUpdate(true, null),
|
||||||
|
new ArrayList<>(getPropagationEntityIds(alarm)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user