ts notification

This commit is contained in:
YevhenBondarenko 2021-12-14 09:12:25 +02:00
parent 6d97d9466f
commit b79c2407c4
9 changed files with 99 additions and 16 deletions

View File

@ -573,10 +573,9 @@ public class TelemetryController extends BaseController {
for (String key : keys) {
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
}
ListenableFuture<List<Void>> future = tsService.remove(user.getTenantId(), entityId, deleteTsKvQueries);
Futures.addCallback(future, new FutureCallback<>() {
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> tmp) {
public void onSuccess(@Nullable Void tmp) {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
@ -586,7 +585,7 @@ public class TelemetryController extends BaseController {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
}, executor);
});
});
}

View File

@ -26,14 +26,15 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
@ -47,6 +48,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
@ -64,7 +66,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.state.DeviceStateService;
@ -467,6 +468,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getScope(), proto.getKeysList(), callback);
} else if (msg.hasTsDelete()) {
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
subscriptionManagerService.onTimeSeriesDelete(
new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getKeysList(), callback);
} else if (msg.hasAlarmUpdate()) {
TbAlarmUpdateProto proto = msg.getAlarmUpdate();
subscriptionManagerService.onAlarmUpdate(

View File

@ -19,8 +19,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.Alarm;
@ -40,20 +42,20 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateValueListProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
@ -337,6 +339,30 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
callback.onSuccess();
}
@Override
public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback) {
onLocalTelemetrySubUpdate(entityId,
s -> {
if (TbSubscriptionType.TIMESERIES.equals(s.getType())) {
return (TbTimeseriesSubscription) s;
} else {
return null;
}
}, s -> true, s -> {
List<TsKvEntry> subscriptionUpdate = null;
for (String key : keys) {
if (s.isAllKeys() || s.getKeyStates().containsKey(key)) {
if (subscriptionUpdate == null) {
subscriptionUpdate = new ArrayList<>();
}
subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, null)));
}
}
return subscriptionUpdate;
}, false);
callback.onSuccess();
}
private <T extends TbSubscription> void onLocalTelemetrySubUpdate(EntityId entityId,
Function<TbSubscription, T> castFunction,
Predicate<T> filterFunction,

View File

@ -40,6 +40,8 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.subscription;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@ -30,25 +31,25 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@ -207,6 +208,19 @@ public class TbSubscriptionUtils {
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
}
public static ToCoreMsg toTimeseriesDeleteProto(TenantId tenantId, EntityId entityId, List<String> keys) {
TbTimeSeriesDeleteProto.Builder builder = TbTimeSeriesDeleteProto.newBuilder();
builder.setEntityType(entityId.getEntityType().name());
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
builder.addAllKeys(keys);
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
msgBuilder.setTsDelete(builder);
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
}
public static ToCoreMsg toAttributesUpdateProto(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
TbAttributeUpdateProto.Builder builder = TbAttributeUpdateProto.newBuilder();
builder.setEntityType(entityId.getEntityType().name());

View File

@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
@ -271,6 +272,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}, tsCallBackExecutor);
}
@Override
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys));
}
@Override
public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
@ -337,6 +345,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) {
subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY);
} else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
}
} else {
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesDeleteProto(tenantId, entityId, keys);
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
}
}
private <S> void addVoidCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override

View File

@ -561,6 +561,15 @@ message TbAttributeDeleteProto {
repeated string keys = 7;
}
message TbTimeSeriesDeleteProto {
string entityType = 1;
int64 entityIdMSB = 2;
int64 entityIdLSB = 3;
int64 tenantIdMSB = 4;
int64 tenantIdLSB = 5;
repeated string keys = 6;
}
message TbTimeSeriesUpdateProto {
string entityType = 1;
int64 entityIdMSB = 2;
@ -614,6 +623,7 @@ message SubscriptionMgrMsgProto {
TbAlarmSubscriptionProto alarmSub = 7;
TbAlarmUpdateProto alarmUpdate = 8;
TbAlarmDeleteProto alarmDelete = 9;
TbTimeSeriesDeleteProto tsDelete = 10;
}
message LocalSubscriptionServiceMsgProto {

View File

@ -31,5 +31,7 @@ public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuer
this(key, startTs, endTs, false);
}
public Boolean getRewriteLatestIfDeleted() {
return rewriteLatestIfDeleted;
}
}

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
@ -54,5 +55,5 @@ public interface RuleEngineTelemetryService {
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
}