From b79c2407c49aceb814a17d5a6e99198c5119d020 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 14 Dec 2021 09:12:25 +0200 Subject: [PATCH] ts notification --- .../controller/TelemetryController.java | 7 ++-- .../queue/DefaultTbCoreConsumerService.java | 11 ++++-- .../DefaultSubscriptionManagerService.java | 34 ++++++++++++++++--- .../SubscriptionManagerService.java | 2 ++ .../subscription/TbSubscriptionUtils.java | 22 +++++++++--- .../DefaultTelemetrySubscriptionService.java | 22 ++++++++++++ common/cluster-api/src/main/proto/queue.proto | 10 ++++++ .../common/data/kv/BaseDeleteTsKvQuery.java | 4 ++- .../api/RuleEngineTelemetryService.java | 3 +- 9 files changed, 99 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 7a6a1620ae..70a6b029ed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -573,10 +573,9 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); } - ListenableFuture> future = tsService.remove(user.getTenantId(), entityId, deleteTsKvQueries); - Futures.addCallback(future, new FutureCallback<>() { + tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List tmp) { + public void onSuccess(@Nullable Void tmp) { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); result.setResult(new ResponseEntity<>(HttpStatus.OK)); } @@ -586,7 +585,7 @@ public class TelemetryController extends BaseController { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); } - }, executor); + }); }); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index ad7a0340cf..3602aa804e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -26,14 +26,15 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; @@ -47,6 +48,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; @@ -64,7 +66,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; @@ -467,6 +468,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService keys, TbCallback callback) { + onLocalTelemetrySubUpdate(entityId, + s -> { + if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { + return (TbTimeseriesSubscription) s; + } else { + return null; + } + }, s -> true, s -> { + List subscriptionUpdate = null; + for (String key : keys) { + if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { + if (subscriptionUpdate == null) { + subscriptionUpdate = new ArrayList<>(); + } + subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, null))); + } + } + return subscriptionUpdate; + }, false); + callback.onSuccess(); + } + private void onLocalTelemetrySubUpdate(EntityId entityId, Function castFunction, Predicate filterFunction, diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index 37850b701f..67f2e10614 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -40,6 +40,8 @@ public interface SubscriptionManagerService extends ApplicationListener keys, TbCallback empty); + void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback); + void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 25b67d3a06..82e0b17f67 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.subscription; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -30,25 +31,25 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue; +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; @@ -207,6 +208,19 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); } + public static ToCoreMsg toTimeseriesDeleteProto(TenantId tenantId, EntityId entityId, List keys) { + TbTimeSeriesDeleteProto.Builder builder = TbTimeSeriesDeleteProto.newBuilder(); + builder.setEntityType(entityId.getEntityType().name()); + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + builder.addAllKeys(keys); + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); + msgBuilder.setTsDelete(builder); + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + } + public static ToCoreMsg toAttributesUpdateProto(TenantId tenantId, EntityId entityId, String scope, List attributes) { TbAttributeUpdateProto.Builder builder = TbAttributeUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a3771988cf..dba664013d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -271,6 +272,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }, tsCallBackExecutor); } + @Override + public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); + addVoidCallback(deleteFuture, callback); + addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys)); + } + @Override public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback) { saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) @@ -337,6 +345,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } + private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (currentPartitions.contains(tpi)) { + if (subscriptionManagerService.isPresent()) { + subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY); + } else { + log.warn("Possible misconfiguration because subscriptionManagerService is null!"); + } + } else { + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesDeleteProto(tenantId, entityId, keys); + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null); + } + } + private void addVoidCallback(ListenableFuture saveFuture, final FutureCallback callback) { Futures.addCallback(saveFuture, new FutureCallback() { @Override diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index dca92c1c7a..d576f38340 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -561,6 +561,15 @@ message TbAttributeDeleteProto { repeated string keys = 7; } +message TbTimeSeriesDeleteProto { + string entityType = 1; + int64 entityIdMSB = 2; + int64 entityIdLSB = 3; + int64 tenantIdMSB = 4; + int64 tenantIdLSB = 5; + repeated string keys = 6; +} + message TbTimeSeriesUpdateProto { string entityType = 1; int64 entityIdMSB = 2; @@ -614,6 +623,7 @@ message SubscriptionMgrMsgProto { TbAlarmSubscriptionProto alarmSub = 7; TbAlarmUpdateProto alarmUpdate = 8; TbAlarmDeleteProto alarmDelete = 9; + TbTimeSeriesDeleteProto tsDelete = 10; } message LocalSubscriptionServiceMsgProto { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java index 6c9dfbd198..3ffedaa69a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java @@ -31,5 +31,7 @@ public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuer this(key, startTs, endTs, false); } - + public Boolean getRewriteLatestIfDeleted() { + return rewriteLatestIfDeleted; + } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index e641c83144..a19d0fad0c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.Collection; @@ -54,5 +55,5 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - + void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); }