From faaf07b1eaf87bc03ab3ea0d40a584a71c758ed7 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 16 Aug 2022 17:34:02 +0300 Subject: [PATCH 01/15] Edge sync functionality - added cluster support --- .../server/actors/app/AppActor.java | 7 +- .../server/actors/tenant/TenantActor.java | 27 ++++++- .../server/controller/BaseController.java | 5 +- .../server/controller/EdgeController.java | 20 ++++- .../service/edge/rpc/EdgeGrpcService.java | 78 +++++++++++++++---- .../service/edge/rpc/EdgeRpcService.java | 11 ++- .../queue/DefaultTbClusterService.java | 26 ++++++- .../queue/DefaultTbCoreConsumerService.java | 24 +++--- .../server/cluster/TbClusterService.java | 6 ++ common/cluster-api/src/main/proto/queue.proto | 2 + .../server/common/msg/MsgType.java | 8 +- .../common/msg/edge/FromEdgeSyncResponse.java | 41 ++++++++++ .../common/msg/edge/ToEdgeSyncRequest.java | 39 ++++++++++ 13 files changed, 253 insertions(+), 41 deletions(-) create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 2817b1e359..4583c78d11 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -36,7 +36,6 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -105,7 +104,9 @@ public class AppActor extends ContextAwareActor { onToDeviceActorMsg((TenantAwareMsg) msg, true); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: - onToTenantActorMsg((EdgeEventUpdateMsg) msg); + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + onToTenantActorMsg((TenantAwareMsg) msg); break; case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.TENANT); @@ -193,7 +194,7 @@ public class AppActor extends ContextAwareActor { () -> new TenantActor.ActorCreator(systemContext, tenantId)); } - private void onToTenantActorMsg(EdgeEventUpdateMsg msg) { + private void onToTenantActorMsg(TenantAwareMsg msg) { TbActorRef target = null; if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index c22b44d7cf..5906b73db8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -48,6 +48,8 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; @@ -167,7 +169,9 @@ public class TenantActor extends RuleChainManagerActor { onRuleChainMsg((RuleChainAwareMsg) msg); break; case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: - onToEdgeSessionMsg((EdgeEventUpdateMsg) msg); + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + onToEdgeSessionMsg(msg); break; default: return false; @@ -271,9 +275,24 @@ public class TenantActor extends RuleChainManagerActor { () -> new DeviceActorCreator(systemContext, tenantId, deviceId)); } - private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) { - log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg); - systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId()); + private void onToEdgeSessionMsg(TbActorMsg msg) { + switch (msg.getMsgType()) { + case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: + EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg; + log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg); + systemContext.getEdgeRpcService().onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId()); + break; + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg; + log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg); + systemContext.getEdgeRpcService().startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId()); + break; + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg; + log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg); + systemContext.getEdgeRpcService().processSyncResponse(fromEdgeSyncResponse); + break; + } } private ApiUsageState getApiUsageState() { diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 1e9958cc25..2e77045771 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -269,10 +269,7 @@ public abstract class BaseController { protected EdgeService edgeService; @Autowired(required = false) - protected EdgeNotificationService edgeNotificationService; - - @Autowired(required = false) - protected EdgeRpcService edgeGrpcService; + protected EdgeRpcService edgeRpcService; @Autowired protected TbNotificationEntityService notificationEntityService; diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 59c9eb39c1..6a5dd44d0b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -32,6 +33,7 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.rule.engine.flow.TbRuleChainInputNode; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EntitySubtype; @@ -47,6 +49,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; @@ -61,6 +65,7 @@ import org.thingsboard.server.service.security.permission.Resource; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.server.controller.ControllerConstants.CUSTOMER_ID_PARAM_DESCRIPTION; @@ -529,24 +534,35 @@ public class EdgeController extends BaseController { "All entities that are assigned to particular edge are going to be send to remote edge service." + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/edge/sync/{edgeId}", method = RequestMethod.POST) - public void syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true) + public DeferredResult syncEdge(@ApiParam(value = EDGE_ID_PARAM_DESCRIPTION, required = true) @PathVariable("edgeId") String strEdgeId) throws ThingsboardException { checkParameter("edgeId", strEdgeId); try { + final DeferredResult response = new DeferredResult<>(); if (isEdgesEnabled()) { EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); edgeId = checkNotNull(edgeId); SecurityUser user = getCurrentUser(); TenantId tenantId = user.getTenantId(); - edgeGrpcService.startSyncProcess(tenantId, edgeId); + ToEdgeSyncRequest request = new ToEdgeSyncRequest(UUID.randomUUID(), tenantId, edgeId); + edgeRpcService.processSyncRequest(request, fromEdgeSyncResponse -> reply(response, fromEdgeSyncResponse)); } else { throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL); } + return response; } catch (Exception e) { throw handleException(e); } } + private void reply(DeferredResult response, FromEdgeSyncResponse fromEdgeSyncResponse) { + if (fromEdgeSyncResponse.isSuccess()) { + response.setResult(new ResponseEntity<>(HttpStatus.OK)); + } else { + response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL)); + } + } + @ApiOperation(value = "Find missing rule chains (findMissingToRelatedRuleChains)", notes = "Returns list of rule chains ids that are not assigned to particular edge, but these rule chains are present in the already assigned rule chains to edge." + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index c56fcd4ed9..18d53fea71 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -26,6 +26,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.ResourceUtils; import org.thingsboard.server.common.data.edge.Edge; @@ -34,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc; import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg; @@ -59,6 +62,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; @Service @Slf4j @@ -71,6 +75,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private final Map sessionNewEvents = new HashMap<>(); private final ConcurrentMap> sessionEdgeEventChecks = new ConcurrentHashMap<>(); + private final ConcurrentMap> localSyncEdgeRequests = new ConcurrentHashMap<>(); + @Value("${edges.rpc.port}") private int rpcPort; @Value("${edges.rpc.ssl.enabled}") @@ -98,12 +104,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Autowired private TelemetrySubscriptionService tsSubService; + @Autowired + private TbClusterService clusterService; + private Server server; private ScheduledExecutorService edgeEventProcessingExecutorService; private ScheduledExecutorService sendDownlinkExecutorService; + private ScheduledExecutorService syncScheduler; + @PostConstruct public void init() { log.info("Initializing Edge RPC service!"); @@ -131,6 +142,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); + this.syncScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-sync-scheduler")); log.info("Edge RPC service initialized!"); } @@ -191,16 +203,19 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i @Override public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) { - log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); - final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); - newEventLock.lock(); - try { - if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) { - log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId()); - sessionNewEvents.put(edgeId, true); + EdgeGrpcSession session = sessions.get(edgeId); + if (session != null && session.isConnected()) { + log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); + final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); + newEventLock.lock(); + try { + if (Boolean.FALSE.equals(sessionNewEvents.get(edgeId))) { + log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId()); + sessionNewEvents.put(edgeId, true); + } + } finally { + newEventLock.unlock(); } - } finally { - newEventLock.unlock(); } } @@ -221,13 +236,48 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } @Override - public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { + public void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) { EdgeGrpcSession session = sessions.get(edgeId); - if (session != null && session.isConnected()) { - session.startSyncProcess(tenantId, edgeId); + if (session != null) { + boolean success = false; + if (session.isConnected()) { + session.startSyncProcess(tenantId, edgeId); + success = true; + } + clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success)); + } + } + + @Override + public void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer) { + log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId()); + UUID requestId = request.getId(); + localSyncEdgeRequests.put(requestId, responseConsumer); + clusterService.pushEdgeSyncRequestToCore(request); + scheduleSyncRequestTimeout(request, requestId); + } + + private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) { + log.trace("[{}] scheduling sync edge request", requestId); + syncScheduler.schedule(() -> { + log.trace("[{}] checking if sync edge request is not processed...", requestId); + Consumer consumer = localSyncEdgeRequests.remove(requestId); + if (consumer != null) { + log.trace("[{}] timeout for processing sync edge request.", requestId); + consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false)); + } + }, 10, TimeUnit.SECONDS); + } + + @Override + public void processSyncResponse(FromEdgeSyncResponse response) { + log.trace("[{}] Received response from sync service: [{}]", response.getId(), response); + UUID requestId = response.getId(); + Consumer consumer = localSyncEdgeRequests.remove(requestId); + if (consumer != null) { + consumer.accept(response); } else { - log.error("[{}] Edge is not connected [{}]", tenantId, edgeId); - throw new RuntimeException("Edge is not connected"); + log.trace("[{}] Unknown or stale sync response received [{}]", requestId, response); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index ad65c15235..9564e195de 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -18,6 +18,11 @@ package org.thingsboard.server.service.edge.rpc; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; + +import java.util.UUID; +import java.util.function.Consumer; public interface EdgeRpcService { @@ -27,5 +32,9 @@ public interface EdgeRpcService { void onEdgeEvent(TenantId tenantId, EdgeId edgeId); - void startSyncProcess(TenantId tenantId, EdgeId edgeId); + void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId); + + void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer); + + void processSyncResponse(FromEdgeSyncResponse response); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 062e4128d4..7c2ddf09f9 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -49,6 +49,8 @@ import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -351,12 +353,32 @@ public class DefaultTbClusterService implements TbClusterService { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); byte[] msgBytes = encodingService.encode(msg); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(edgeId, toCoreMsg); + } + + @Override + public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { + log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); + byte[] msgBytes = encodingService.encode(toEdgeSyncRequest); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequestMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); + } + + @Override + public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { + log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); + byte[] msgBytes = encodingService.encode(fromEdgeSyncResponse); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponseMsg(ByteString.copyFrom(msgBytes)).build(); + pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); + } + + private void pushEdgeSyncMsgToCore(EdgeId edgeId, ToCoreNotificationMsg toCoreMsg) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); for (String serviceId : tbCoreServices) { TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build(); - toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEdgeId().getId(), toCoreMsg), null); + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(edgeId.getId(), toCoreMsg), null); toCoreNfs.incrementAndGet(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index a3bae8bc4e..745ffa7bab 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -20,8 +20,6 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; -import org.springframework.core.annotation.Order; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; @@ -76,7 +74,6 @@ import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; -import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService; import org.thingsboard.server.service.sync.vc.GitVersionControlQueueService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -317,13 +314,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()); - if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tellWithHighPriority(actorMsg.get()); - } - callback.onSuccess(); + } else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()), callback); + } else if (!toCoreNotification.getToEdgeSyncRequestMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getToEdgeSyncRequestMsg().toByteArray()), callback); + } else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) { + forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback); } else if (toCoreNotification.hasQueueUpdateMsg()) { TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg(); partitionService.updateQueue(queue); @@ -552,6 +548,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { + if (actorMsg.isPresent()) { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); + actorContext.tellWithHighPriority(actorMsg.get()); + } + callback.onSuccess(); + } + private void throwNotHandled(Object msg, TbCallback callback) { log.warn("Message not handled: {}", msg); callback.onFailure(new RuntimeException("Message not handled!")); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index d258aa3820..b8f814d613 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -29,6 +29,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; @@ -88,5 +90,9 @@ public interface TbClusterService extends TbQueueClusterService { void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId); + void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest); + + void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse); + void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 1714e109a3..cb3443a71f 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -932,6 +932,8 @@ message ToCoreNotificationMsg { QueueUpdateMsg queueUpdateMsg = 5; QueueDeleteMsg queueDeleteMsg = 6; VersionControlResponseMsg vcResponseMsg = 7; + bytes toEdgeSyncRequestMsg = 8; + bytes fromEdgeSyncResponseMsg = 9; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 269aa1e727..98c185b6b5 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -122,6 +122,12 @@ public enum MsgType { /** * Message that is sent on Edge Event to Edge Session */ - EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG; + EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG, + + /** + * Messages that are sent to and from edge session to start edge synchronization process + */ + EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG, + EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java new file mode 100644 index 0000000000..4e6d456d22 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.edge; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; + +import java.util.UUID; + +@AllArgsConstructor +@Getter +public class FromEdgeSyncResponse implements TenantAwareMsg, ToAllNodesMsg { + + private final UUID id; + private final TenantId tenantId; + private final EdgeId edgeId; + private final boolean success; + + @Override + public MsgType getMsgType() { + return MsgType.EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG; + } +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java new file mode 100644 index 0000000000..6e1b0df3ad --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.edge; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; + +import java.util.UUID; + +@AllArgsConstructor +@Getter +public class ToEdgeSyncRequest implements TenantAwareMsg, ToAllNodesMsg { + private final UUID id; + private final TenantId tenantId; + private final EdgeId edgeId; + + @Override + public MsgType getMsgType() { + return MsgType.EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG; + } +} From ecda9c8e2fb1cc7254798c11c7ee8ea9763d7630 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 6 Sep 2022 13:38:10 +0300 Subject: [PATCH 02/15] Notify devices in case shared attribute updates from edge --- .../edge/rpc/processor/BaseEdgeProcessor.java | 8 ++++ .../rpc/processor/TelemetryEdgeProcessor.java | 41 +++++++++--------- .../thingsboard/server/edge/BaseEdgeTest.java | 42 ++++++++++++++++++- 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9d6199bdcd..2be5a5684d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -74,9 +74,11 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor; +import org.thingsboard.server.service.entitiy.TbNotificationEntityService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.state.DeviceStateService; +import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; @@ -88,6 +90,12 @@ public abstract class BaseEdgeProcessor { protected static final int DEFAULT_PAGE_SIZE = 1000; + @Autowired + protected TelemetrySubscriptionService tsSubService; + + @Autowired(required = false) + protected TbNotificationEntityService notificationEntityService; + @Autowired protected RuleChainService ruleChainService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index ad6446cb95..32833da649 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; @@ -57,6 +58,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; +import org.thingsboard.server.controller.BaseController; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; @@ -101,7 +103,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); @@ -221,39 +223,36 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } - private ListenableFuture processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processAttributesUpdate(TenantId tenantId, + EntityId entityId, + TransportProtos.PostAttributeMsg msg, + TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - Set attributes = JsonConverter.convertToAttributes(json); - ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); - Futures.addCallback(future, new FutureCallback<>() { + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); + String scope = metaData.getValue("scope"); + tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback() { @Override - public void onSuccess(@Nullable List keys) { - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process attributes update [{}]", msg, t); - futureToSet.setException(t); - } - }); + public void onSuccess(@Nullable Void tmp) { + logAttributesUpdated(tenantId, entityId, scope, attributes, null); + futureToSet.set(null); } @Override public void onFailure(Throwable t) { log.error("Can't process attributes update [{}]", msg, t); + logAttributesUpdated(tenantId, entityId, scope, attributes, t); futureToSet.setException(t); } - }, dbCallbackExecutorService); + }); return futureToSet; } + private void logAttributesUpdated(TenantId tenantId, EntityId entityId, String scope, List attributes, Throwable e) { + notificationEntityService.logEntityAction(tenantId, entityId, ActionType.ATTRIBUTES_UPDATED, null, + BaseController.toException(e), scope, attributes); + } + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributeDeleteMsg.getScope(); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index dcd354ace5..34f71a9ffa 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -26,6 +26,7 @@ import com.google.protobuf.AbstractMessage; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; +import io.netty.handler.codec.mqtt.MqttQoS; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; @@ -162,10 +163,11 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.transport.AbstractTransportIntegrationTest; import org.thingsboard.server.transport.lwm2m.AbstractLwM2MIntegrationTest; +import org.thingsboard.server.transport.mqtt.MqttTestCallback; +import org.thingsboard.server.transport.mqtt.MqttTestClient; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -174,11 +176,13 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.thingsboard.server.common.data.ota.OtaPackageType.FIRMWARE; @TestPropertySource(properties = { "edges.enabled=true", + "transport.mqtt.enabled=true" }) abstract public class BaseEdgeTest extends AbstractControllerTest { @@ -2147,6 +2151,42 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertEquals(queueUpdateMsg.getIdLSB(), savedQueue.getUuidId().getLeastSignificantBits()); } + @Test + public void updateSharedAttributeOnCloudAndValidateDeviceSubscription() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class); + + MqttTestClient client = new MqttTestClient(); + client.connectAndWait(deviceCredentials.getCredentialsId()); + MqttTestCallback onUpdateCallback = new MqttTestCallback(); + client.setCallback(onUpdateCallback); + client.subscribeAndWait("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE); + + edgeImitator.expectResponsesAmount(1); + + JsonObject attributesData = new JsonObject(); + String attrKey = "sharedAttrName"; + String attrValue = "sharedAttrValue"; + attributesData.addProperty(attrKey, attrValue); + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder(); + entityDataBuilder.setEntityType(device.getId().getEntityType().name()); + entityDataBuilder.setEntityIdMSB(device.getId().getId().getMostSignificantBits()); + entityDataBuilder.setEntityIdLSB(device.getId().getId().getLeastSignificantBits()); + entityDataBuilder.setAttributesUpdatedMsg(JsonConverter.convertToAttributesProto(attributesData)); + entityDataBuilder.setPostAttributeScope(DataConstants.SHARED_SCOPE); + uplinkMsgBuilder.addEntityData(entityDataBuilder.build()); + + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + Assert.assertTrue(onUpdateCallback.getSubscribeLatch().await(5, TimeUnit.SECONDS)); + + assertEquals(JacksonUtil.OBJECT_MAPPER.createObjectNode().put(attrKey, attrValue), + JacksonUtil.fromBytes(onUpdateCallback.getPayloadBytes())); + } + // Utility methods private Device saveDeviceOnCloudAndVerifyDeliveryToEdge() throws Exception { From 32fcfdac92f5fe29907d58195a15f9e1a4b01e47 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 6 Sep 2022 16:24:18 +0300 Subject: [PATCH 03/15] Moved edge rpc processing logic from tenant actor to edge service. Execute requests to edge service in a separate executor service --- .../server/actors/app/AppActor.java | 7 +- .../server/actors/tenant/TenantActor.java | 26 +----- .../service/edge/rpc/EdgeGrpcService.java | 89 ++++++++++++------- .../service/edge/rpc/EdgeRpcService.java | 10 +-- .../common/msg/edge/EdgeEventUpdateMsg.java | 4 +- .../common/msg/edge/EdgeSessionMsg.java | 24 +++++ .../common/msg/edge/FromEdgeSyncResponse.java | 4 +- .../common/msg/edge/ToEdgeSyncRequest.java | 4 +- 8 files changed, 97 insertions(+), 71 deletions(-) create mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeSessionMsg.java diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 4583c78d11..eeb6d8e82e 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; @@ -106,7 +107,7 @@ public class AppActor extends ContextAwareActor { case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: - onToTenantActorMsg((TenantAwareMsg) msg); + onToEdgeSessionMsg((EdgeSessionMsg) msg); break; case SESSION_TIMEOUT_MSG: ctx.broadcastToChildrenByType(msg, EntityType.TENANT); @@ -194,7 +195,7 @@ public class AppActor extends ContextAwareActor { () -> new TenantActor.ActorCreator(systemContext, tenantId)); } - private void onToTenantActorMsg(TenantAwareMsg msg) { + private void onToEdgeSessionMsg(EdgeSessionMsg msg) { TbActorRef target = null; if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); @@ -204,7 +205,7 @@ public class AppActor extends ContextAwareActor { if (target != null) { target.tellWithHighPriority(msg); } else { - log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg); + log.debug("[{}] Invalid edge session msg: {}", msg.getTenantId(), msg); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 5906b73db8..5718f581ad 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -47,9 +47,7 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; -import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; -import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; -import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; +import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; @@ -171,7 +169,7 @@ public class TenantActor extends RuleChainManagerActor { case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: - onToEdgeSessionMsg(msg); + onToEdgeSessionMsg((EdgeSessionMsg) msg); break; default: return false; @@ -275,24 +273,8 @@ public class TenantActor extends RuleChainManagerActor { () -> new DeviceActorCreator(systemContext, tenantId, deviceId)); } - private void onToEdgeSessionMsg(TbActorMsg msg) { - switch (msg.getMsgType()) { - case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: - EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg; - log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg); - systemContext.getEdgeRpcService().onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId()); - break; - case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: - ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg; - log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg); - systemContext.getEdgeRpcService().startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId()); - break; - case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: - FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg; - log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg); - systemContext.getEdgeRpcService().processSyncResponse(fromEdgeSyncResponse); - break; - } + private void onToEdgeSessionMsg(EdgeSessionMsg msg) { + systemContext.getEdgeRpcService().onToEdgeSessionMsg(tenantId, msg); } private ApiUsageState getApiUsageState() { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 18d53fea71..70abad7ba6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -35,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc; @@ -113,7 +115,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService sendDownlinkExecutorService; - private ScheduledExecutorService syncScheduler; + private ScheduledExecutorService executorService; @PostConstruct public void init() { @@ -140,9 +142,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.error("Failed to start Edge RPC server!", e); throw new RuntimeException("Failed to start Edge RPC server!"); } - this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); + this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-check-scheduler")); this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); - this.syncScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-sync-scheduler")); + this.executorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-service")); log.info("Edge RPC service initialized!"); } @@ -165,6 +167,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (sendDownlinkExecutorService != null) { sendDownlinkExecutorService.shutdownNow(); } + if (executorService != null) { + executorService.shutdownNow(); + } } @Override @@ -172,37 +177,63 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream(); } + @Override + public void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg) { + executorService.execute(() -> { + switch (msg.getMsgType()) { + case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG: + EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg; + log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg); + onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId()); + break; + case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG: + ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg; + log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg); + startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId()); + break; + case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG: + FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg; + log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg); + processSyncResponse(fromEdgeSyncResponse); + break; + } + }); + } + @Override public void updateEdge(TenantId tenantId, Edge edge) { - EdgeGrpcSession session = sessions.get(edge.getId()); - if (session != null && session.isConnected()) { - log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); - session.onConfigurationUpdate(edge); - } else { - log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); - } + executorService.execute(() -> { + EdgeGrpcSession session = sessions.get(edge.getId()); + if (session != null && session.isConnected()) { + log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); + session.onConfigurationUpdate(edge); + } else { + log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); + } + }); } @Override public void deleteEdge(TenantId tenantId, EdgeId edgeId) { - EdgeGrpcSession session = sessions.get(edgeId); - if (session != null && session.isConnected()) { - log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId); - session.close(); - sessions.remove(edgeId); - final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); - newEventLock.lock(); - try { - sessionNewEvents.remove(edgeId); - } finally { - newEventLock.unlock(); + executorService.execute(() -> { + EdgeGrpcSession session = sessions.get(edgeId); + if (session != null && session.isConnected()) { + log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId); + session.close(); + sessions.remove(edgeId); + final Lock newEventLock = sessionNewEventsLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); + newEventLock.lock(); + try { + sessionNewEvents.remove(edgeId); + } finally { + newEventLock.unlock(); + } + cancelScheduleEdgeEventsCheck(edgeId); } - cancelScheduleEdgeEventsCheck(edgeId); - } + }); } - @Override - public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) { + private void onEdgeEvent(TenantId tenantId, EdgeId edgeId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null && session.isConnected()) { log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); @@ -235,8 +266,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i scheduleEdgeEventsCheck(edgeGrpcSession); } - @Override - public void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) { + private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null) { boolean success = false; @@ -259,7 +289,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) { log.trace("[{}] scheduling sync edge request", requestId); - syncScheduler.schedule(() -> { + executorService.schedule(() -> { log.trace("[{}] checking if sync edge request is not processed...", requestId); Consumer consumer = localSyncEdgeRequests.remove(requestId); if (consumer != null) { @@ -269,8 +299,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i }, 10, TimeUnit.SECONDS); } - @Override - public void processSyncResponse(FromEdgeSyncResponse response) { + private void processSyncResponse(FromEdgeSyncResponse response) { log.trace("[{}] Received response from sync service: [{}]", response.getId(), response); UUID requestId = response.getId(); Consumer consumer = localSyncEdgeRequests.remove(requestId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index 9564e195de..6c5515337e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -18,23 +18,19 @@ package org.thingsboard.server.service.edge.rpc; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; -import java.util.UUID; import java.util.function.Consumer; public interface EdgeRpcService { + void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg); + void updateEdge(TenantId tenantId, Edge edge); void deleteEdge(TenantId tenantId, EdgeId edgeId); - void onEdgeEvent(TenantId tenantId, EdgeId edgeId); - - void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId); - void processSyncRequest(ToEdgeSyncRequest request, Consumer responseConsumer); - - void processSyncResponse(FromEdgeSyncResponse response); } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java index 3469cde65f..0285eef0b1 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java @@ -20,11 +20,9 @@ import lombok.ToString; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; @ToString -public class EdgeEventUpdateMsg implements TenantAwareMsg, ToAllNodesMsg { +public class EdgeEventUpdateMsg implements EdgeSessionMsg { @Getter private final TenantId tenantId; @Getter diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeSessionMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeSessionMsg.java new file mode 100644 index 0000000000..c4719f9b29 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeSessionMsg.java @@ -0,0 +1,24 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.edge; + +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; + +import java.io.Serializable; + +public interface EdgeSessionMsg extends TenantAwareMsg, ToAllNodesMsg { +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java index 4e6d456d22..e93e5cd3b5 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java @@ -20,14 +20,12 @@ import lombok.Getter; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; import java.util.UUID; @AllArgsConstructor @Getter -public class FromEdgeSyncResponse implements TenantAwareMsg, ToAllNodesMsg { +public class FromEdgeSyncResponse implements EdgeSessionMsg { private final UUID id; private final TenantId tenantId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java index 6e1b0df3ad..32e1068e73 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java @@ -20,14 +20,12 @@ import lombok.Getter; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg; import java.util.UUID; @AllArgsConstructor @Getter -public class ToEdgeSyncRequest implements TenantAwareMsg, ToAllNodesMsg { +public class ToEdgeSyncRequest implements EdgeSessionMsg { private final UUID id; private final TenantId tenantId; private final EdgeId edgeId; From d85737b84b636d5e4b92dd727bd53af500002e47 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 3 Oct 2022 18:36:52 +0300 Subject: [PATCH 04/15] TTL and table partitioning for audit logs --- .../main/data/upgrade/3.4.1/schema_update.sql | 68 +++++++++ .../install/ThingsboardInstallService.java | 4 + .../install/SqlDatabaseUpgradeService.java | 32 +++-- .../update/DefaultDataUpdateService.java | 24 +++- .../service/ttl/AuditLogsCleanUpService.java | 63 +++++++++ application/src/main/resources/logback.xml | 1 + .../src/main/resources/thingsboard.yml | 6 + .../common/util/ThrowingConsumer.java | 22 +++ .../server/dao/audit/AuditLogDao.java | 5 + ...paAbstractDaoListeningExecutorService.java | 4 + .../server/dao/sql/audit/JpaAuditLogDao.java | 63 ++++++++- .../server/dao/sql/event/JpaBaseEventDao.java | 56 ++------ .../sql/event/SqlEventCleanupRepository.java | 119 ++-------------- .../insert/sql/SqlPartitioningRepository.java | 130 +++++++++++++++++- .../resources/sql/schema-entities-idx.sql | 2 +- .../main/resources/sql/schema-entities.sql | 4 +- 16 files changed, 430 insertions(+), 173 deletions(-) create mode 100644 application/src/main/data/upgrade/3.4.1/schema_update.sql create mode 100644 application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java create mode 100644 common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql new file mode 100644 index 0000000000..1b75a1c1e1 --- /dev/null +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -0,0 +1,68 @@ +-- +-- Copyright © 2016-2022 The Thingsboard Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE IF NOT EXISTS tmp_audit_log ( + id uuid NOT NULL, + created_time bigint NOT NULL, + tenant_id uuid, + customer_id uuid, + entity_id uuid, + entity_type varchar(255), + entity_name varchar(255), + user_id uuid, + user_name varchar(255), + action_type varchar(255), + action_data varchar(1000000), + action_status varchar(255), + action_failure_details varchar(1000000) +) PARTITION BY RANGE (created_time); +CREATE INDEX IF NOT EXISTS idx_tmp_audit_log_tenant_id_and_created_time ON tmp_audit_log(tenant_id, created_time DESC); + +CREATE OR REPLACE PROCEDURE rename_old_audit_logs_partitions() + LANGUAGE plpgsql AS +$$ +DECLARE + table_partition RECORD; +BEGIN + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'audit_log_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); + END LOOP; +END; +$$; + +CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) + LANGUAGE plpgsql AS +$$ +DECLARE + p RECORD; + partition_end_ts BIGINT; +BEGIN + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms + LOOP + partition_end_ts = p.partition_ts + partition_size_ms; + RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF tmp_audit_log ' || + 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + END LOOP; + + INSERT INTO tmp_audit_log + SELECT * FROM audit_log + WHERE created_time >= start_time_ms AND created_time < end_time_ms; +END; +$$; diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 2c3251834f..33a6d4bdd1 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -225,6 +225,10 @@ public class ThingsboardInstallService { log.info("Upgrading ThingsBoard from version 3.4.0 to 3.4.1 ..."); databaseEntitiesUpgradeService.upgradeDatabase("3.4.0"); dataUpdateService.updateData("3.4.0"); + case "3.4.1": + log.info("Upgrading ThingsBoard from version 3.4.1 to 3.4.2 ..."); + databaseEntitiesUpgradeService.upgradeDatabase("3.4.1"); + dataUpdateService.updateData("3.4.1"); log.info("Updating system data..."); systemDataLoaderService.updateSystemWidgets(); break; diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index ea976b2dec..c3b697f002 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -15,8 +15,6 @@ */ package org.thingsboard.server.service.install; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -24,11 +22,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThrowingConsumer; import org.thingsboard.server.common.data.EntitySubtype; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.TenantProfile; -import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -37,8 +33,6 @@ import org.thingsboard.server.common.data.queue.ProcessingStrategyType; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.queue.SubmitStrategy; import org.thingsboard.server.common.data.queue.SubmitStrategyType; -import org.thingsboard.server.common.data.rule.RuleNode; -import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; @@ -61,11 +55,8 @@ import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; import java.sql.SQLWarning; import java.sql.Statement; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; @@ -609,11 +600,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.error("Failed updating schema!!!", e); } break; + case "3.4.1": + execute(connection -> { + log.info("Updating schema ..."); + runSchemaUpdateScript(connection, "3.4.1"); + connection.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004002;"); + log.info("Schema updated."); + }); + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } } + private void execute(ThrowingConsumer function) { + try (Connection connection = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + function.accept(connection); + } catch (Exception e) { + log.error("Failed to update schema!", e); + } + } + + private void runSchemaUpdateScript(Connection connection, String version) throws Exception { + Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", version, SCHEMA_UPDATE_SQL); + loadSql(schemaUpdateFile, connection); + } + private void loadSql(Path sqlFile, Connection conn) throws Exception { String sql = new String(Files.readAllBytes(sqlFile), Charset.forName("UTF-8")); Statement st = conn.createStatement(); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index d02f50b37b..b307489de6 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.alarm.AlarmDao; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; @@ -138,6 +139,9 @@ public class DefaultDataUpdateService implements DataUpdateService { @Autowired private EventService eventService; + @Autowired + private AuditLogDao auditLogDao; + @Override public void updateData(String fromVersion) throws Exception { switch (fromVersion) { @@ -170,12 +174,19 @@ public class DefaultDataUpdateService implements DataUpdateService { rateLimitsUpdater.updateEntities(); break; case "3.4.0": - String skipEventsMigration = System.getenv("TB_SKIP_EVENTS_MIGRATION"); - if (skipEventsMigration == null || skipEventsMigration.equalsIgnoreCase("false")) { + boolean skipEventsMigration = getEnv("TB_SKIP_EVENTS_MIGRATION", false); + if (!skipEventsMigration) { log.info("Updating data from version 3.4.0 to 3.4.1 ..."); eventService.migrateEvents(); } break; + case "3.4.1": + boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false); + if (!skipAuditLogsMigration) { + log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + auditLogDao.migrateAuditLogs(); + } + break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } @@ -645,4 +656,13 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } + private boolean getEnv(String name, boolean defaultValue) { + String env = System.getenv(name); + if (env == null) { + return defaultValue; + } else { + return Boolean.parseBoolean(env); + } + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java new file mode 100644 index 0000000000..9df77be0d2 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.ttl; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.queue.discovery.PartitionService; + +import java.util.concurrent.TimeUnit; + +import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; + +@Service +@ConditionalOnProperty(name = "sql.ttl.audit_logs.enabled", havingValue = "true") +@ConditionalOnExpression("${sql.ttl.audit_logs.ttl:0} > 0") +@Slf4j +public class AuditLogsCleanUpService extends AbstractCleanUpService { + + private final AuditLogDao auditLogDao; + private final SqlPartitioningRepository partitioningRepository; + + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + + public AuditLogsCleanUpService(PartitionService partitionService, AuditLogDao auditLogDao, SqlPartitioningRepository partitioningRepository) { + super(partitionService); + this.auditLogDao = auditLogDao; + this.partitioningRepository = partitioningRepository; + } + + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.execution_interval_ms})}", + fixedDelayString = "${sql.ttl.audit_logs.execution_interval_ms}") + public void cleanUp() { + long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); + if (isSystemTenantPartitionMine()) { + auditLogDao.cleanUpAuditLogs(auditLogsExpTime); + } else { + partitioningRepository.cleanupPartitionsCache(AUDIT_LOG_COLUMN_FAMILY_NAME, auditLogsExpTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + } + +} diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 941bd7d278..0bf081b35a 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -28,6 +28,7 @@ + diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9edd5d0ed5..da76dae28a 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -265,6 +265,8 @@ sql: batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}" batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}" stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}" + audit_logs: + partition_size: "${SQL_AUDIT_LOGS_PARTITION_SIZE_HOURS:168}" # Default value - 1 week # Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks batch_sort: "${SQL_BATCH_SORT:false}" # Specify whether to remove null characters from strValue of attributes and timeseries before insert @@ -301,6 +303,10 @@ sql: rpc: enabled: "${SQL_TTL_RPC_ENABLED:true}" checking_interval: "${SQL_RPC_TTL_CHECKING_INTERVAL:7200000}" # Number of milliseconds. The current value corresponds to two hours + audit_logs: + enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" + ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size + execution_interval_ms: "${SQL_TTL_AUDIT_LOGS_EXECUTION_INTERVAL_MS:86400000}" # Default value - 1 day relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java b/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java new file mode 100644 index 0000000000..ad067e48c6 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ThrowingConsumer.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +public interface ThrowingConsumer { + + void accept(T t) throws Exception; + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java index 939bc654b6..3f9b1ca01a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogDao.java @@ -39,4 +39,9 @@ public interface AuditLogDao extends Dao { PageData findAuditLogsByTenantIdAndUserId(UUID tenantId, UserId userId, List actionTypes, TimePageLink pageLink); PageData findAuditLogsByTenantId(UUID tenantId, List actionTypes, TimePageLink pageLink); + + void cleanUpAuditLogs(long expTime); + + void migrateAuditLogs(); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java index d81e1a1323..b23be1a9cf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/JpaAbstractDaoListeningExecutorService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.dao.sql; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.JdbcTemplate; import javax.sql.DataSource; import java.sql.SQLException; @@ -32,6 +33,9 @@ public abstract class JpaAbstractDaoListeningExecutorService { @Autowired protected DataSource dataSource; + @Autowired + protected JdbcTemplate jdbcTemplate; + protected void printWarnings(Statement statement) throws SQLException { SQLWarning warnings = statement.getWarnings(); if (warnings != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 9c72bec397..94e9e6718f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -16,8 +16,11 @@ package org.thingsboard.server.dao.sql.audit; import com.google.common.util.concurrent.ListenableFuture; -import org.springframework.beans.factory.annotation.Autowired; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; @@ -28,18 +31,31 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.audit.AuditLogDao; +import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.sql.AuditLogEntity; import org.thingsboard.server.dao.sql.JpaAbstractDao; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; @Component +@RequiredArgsConstructor +@Slf4j public class JpaAuditLogDao extends JpaAbstractDao implements AuditLogDao { - @Autowired - private AuditLogRepository auditLogRepository; + private final AuditLogRepository auditLogRepository; + private final SqlPartitioningRepository partitioningRepository; + private final JdbcTemplate jdbcTemplate; + + @Value("${sql.audit_logs.partition_size:168}") + private int partitionSizeInHours; + @Value("${sql.ttl.audit_logs.ttl:0}") + private long ttlInSec; + + private static final String TABLE_NAME = ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; @Override protected Class getEntityClass() { @@ -54,6 +70,7 @@ public class JpaAuditLogDao extends JpaAbstractDao imp @Override public ListenableFuture saveByTenantId(AuditLog auditLog) { return service.submit(() -> { + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); save(auditLog.getTenantId(), auditLog); return null; }); @@ -113,4 +130,44 @@ public class JpaAuditLogDao extends JpaAbstractDao imp actionTypes, DaoUtil.toPageable(pageLink))); } + + @Override + public void cleanUpAuditLogs(long expTime) { + partitioningRepository.dropPartitionsBefore(TABLE_NAME, expTime, TimeUnit.HOURS.toMillis(partitionSizeInHours)); + } + + @Override + public void migrateAuditLogs() { + long startTime = ttlInSec > 0 ? System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec) : 1480982400000L; + + long currentTime = System.currentTimeMillis(); + var partitionStepInMs = TimeUnit.HOURS.toMillis(partitionSizeInHours); + long numberOfPartitions = (currentTime - startTime) / partitionStepInMs; + + if (numberOfPartitions > 1000) { + String error = "Please adjust your audit logs partitioning configuration. Configuration with partition size " + + "of " + partitionSizeInHours + " hours and corresponding TTL will use " + numberOfPartitions + " " + + "(> 1000) partitions which is not recommended!"; + log.error(error); + throw new RuntimeException(error); + } + + jdbcTemplate.execute("CALL rename_old_audit_logs_partitions()"); + while (startTime < currentTime) { + var endTime = startTime + partitionStepInMs; + log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); + callMigrationFunction(startTime, endTime, partitionStepInMs); + startTime = endTime; + } + log.info("Audit logs migration finished"); + + jdbcTemplate.execute("DROP TABLE IF EXISTS audit_log"); + jdbcTemplate.execute("ALTER TABLE tmp_audit_log RENAME TO audit_log"); + jdbcTemplate.execute("ALTER INDEX IF EXISTS idx_tmp_audit_log_tenant_id_and_created_time RENAME TO idx_audit_log_tenant_id_and_created_time"); + } + + private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { + jdbcTemplate.update("CALL migrate_audit_logs(?, ?, ?)", startTime, endTime, partitionSizeInMs); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index a9a3e25db2..902184eb13 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -18,10 +18,8 @@ package org.thingsboard.server.dao.sql.event; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.event.ErrorEventFilter; @@ -43,7 +41,6 @@ import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import org.thingsboard.server.dao.timeseries.SqlPartition; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -53,7 +50,6 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -63,9 +59,6 @@ import java.util.function.Function; @Component public class JpaBaseEventDao implements EventDao { - private final Map> partitionsByEventType = new ConcurrentHashMap<>(); - private static final ReentrantLock partitionCreationLock = new ReentrantLock(); - @Autowired private EventPartitionConfiguration partitionConfiguration; @@ -120,9 +113,6 @@ public class JpaBaseEventDao implements EventDao { @PostConstruct private void init() { - for (EventType eventType : EventType.values()) { - partitionsByEventType.put(eventType, new ConcurrentHashMap<>()); - } TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() .logName("Events") .batchSize(batchSize) @@ -163,42 +153,11 @@ public class JpaBaseEventDao implements EventDao { event.setCreatedTime(System.currentTimeMillis()); } } - savePartitionIfNotExist(event); + partitioningRepository.createPartitionIfNotExists(event.getType().getTable(), event.getCreatedTime(), + partitionConfiguration.getPartitionSizeInMs(event.getType())); return queue.add(event); } - private void savePartitionIfNotExist(Event event) { - EventType type = event.getType(); - var partitionsMap = partitionsByEventType.get(type); - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(type); - long partitionStartTs = event.getCreatedTime() - (event.getCreatedTime() % partitionDuration); - if (partitionsMap.get(partitionStartTs) == null) { - savePartition(partitionsMap, new SqlPartition(type.getTable(), partitionStartTs, partitionStartTs + partitionDuration, Long.toString(partitionStartTs))); - } - } - - private void savePartition(Map partitionsMap, SqlPartition sqlPartition) { - if (!partitionsMap.containsKey(sqlPartition.getStart())) { - partitionCreationLock.lock(); - try { - log.trace("Saving partition: {}", sqlPartition); - partitioningRepository.save(sqlPartition); - log.trace("Adding partition to map: {}", sqlPartition); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } catch (DataIntegrityViolationException ex) { - log.trace("Error occurred during partition save:", ex); - if (ex.getCause() instanceof ConstraintViolationException) { - log.warn("Saving partition [{}] rejected. Event data will save to the DEFAULT partition.", sqlPartition.getPartitionDate()); - partitionsMap.put(sqlPartition.getStart(), sqlPartition); - } else { - throw new RuntimeException(ex); - } - } finally { - partitionCreationLock.unlock(); - } - } - } - @Override public PageData findEvents(UUID tenantId, UUID entityId, EventType eventType, TimePageLink pageLink) { return DaoUtil.toPageData(getEventRepository(eventType).findEvents(tenantId, entityId, pageLink.getStartTime(), pageLink.getEndTime(), DaoUtil.toPageable(pageLink, EventEntity.eventColumnMap))); @@ -436,23 +395,24 @@ public class JpaBaseEventDao implements EventDao { log.info("Going to cleanup regular events with exp time: {}", regularEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(regularEventExpTs, false); + } else { + cleanupPartitionsCache(regularEventExpTs, false); } - cleanupPartitions(regularEventExpTs, false); } if (debugEventExpTs > 0) { log.info("Going to cleanup debug events with exp time: {}", debugEventExpTs); if (cleanupDb) { eventCleanupRepository.cleanupEvents(debugEventExpTs, true); + } else { + cleanupPartitionsCache(debugEventExpTs, true); } - cleanupPartitions(debugEventExpTs, true); } } - private void cleanupPartitions(long expTime, boolean isDebug) { + private void cleanupPartitionsCache(long expTime, boolean isDebug) { for (EventType eventType : EventType.values()) { if (eventType.isDebug() == isDebug) { - Map partitions = partitionsByEventType.get(eventType); - partitions.keySet().removeIf(startTs -> startTs + partitionConfiguration.getPartitionSizeInMs(eventType) < expTime); + partitioningRepository.cleanupPartitionsCache(eventType.getTable(), expTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java index 53786c448b..57073d34c5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/SqlEventCleanupRepository.java @@ -17,16 +17,12 @@ package org.thingsboard.server.dao.sql.event; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; import org.springframework.stereotype.Repository; import org.thingsboard.server.common.data.event.EventType; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,13 +30,10 @@ import java.util.concurrent.TimeUnit; @Repository public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorService implements EventCleanupRepository { - private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; - private static final int PSQL_VERSION_14 = 140000; - @Autowired private EventPartitionConfiguration partitionConfiguration; - - private volatile Integer currentServerVersion; + @Autowired + private SqlPartitioningRepository partitioningRepository; @Override public void cleanupEvents(long eventExpTime, boolean debug) { @@ -59,16 +52,13 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe callMigrateFunctionByPartitions("regular", "migrate_regular_events", regularEventTs, partitionConfiguration.getRegularPartitionSizeInHours()); callMigrateFunctionByPartitions("debug", "migrate_debug_events", debugEventTs, partitionConfiguration.getDebugPartitionSizeInHours()); - try (Connection connection = dataSource.getConnection(); - PreparedStatement dropFunction1 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); - PreparedStatement dropFunction2 = connection.prepareStatement("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); - PreparedStatement dropTable = connection.prepareStatement("DROP TABLE IF EXISTS event")) { - dropFunction1.execute(); - dropFunction2.execute(); - dropTable.execute(); - } catch (SQLException e) { - log.error("SQLException occurred during drop of the `events` table", e); - throw new RuntimeException(e); + try { + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_regular_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP PROCEDURE IF EXISTS migrate_debug_events(bigint, bigint, int)"); + jdbcTemplate.execute("DROP TABLE IF EXISTS event"); + } catch (DataAccessException e) { + log.error("Error occurred during drop of the `events` table", e); + throw e; } } @@ -94,13 +84,9 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void callMigrateFunction(String functionName, long startTs, long endTs, int partitionSizeInHours) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement("call " + functionName + "(?,?,?)")) { - stmt.setLong(1, startTs); - stmt.setLong(2, endTs); - stmt.setInt(3, partitionSizeInHours); - stmt.execute(); - } catch (SQLException e) { + try { + jdbcTemplate.update("CALL " + functionName + "(?, ?, ?)", startTs, endTs, partitionSizeInHours); + } catch (DataAccessException e) { if (e.getMessage() == null || !e.getMessage().contains("relation \"event\" does not exist")) { log.error("[{}] SQLException occurred during execution of {} with parameters {} and {}", functionName, startTs, partitionSizeInHours, e); throw new RuntimeException(e); @@ -109,82 +95,7 @@ public class SqlEventCleanupRepository extends JpaAbstractDaoListeningExecutorSe } private void cleanupEvents(EventType eventType, long eventExpTime) { - var partitionDuration = partitionConfiguration.getPartitionSizeInMs(eventType); - List partitions = fetchPartitions(eventType); - for (var partitionTs : partitions) { - var partitionEndTs = partitionTs + partitionDuration; - if (partitionEndTs < eventExpTime) { - log.info("[{}] Detaching expired partition: [{}-{}]", eventType, partitionTs, partitionEndTs); - if (detachAndDropPartition(eventType, partitionTs)) { - log.info("[{}] Detached expired partition: {}", eventType, partitionTs); - } - } else { - log.debug("[{}] Skip valid partition: {}", eventType, partitionTs); - } - } - } - - private List fetchPartitions(EventType eventType) { - List partitions = new ArrayList<>(); - try (Connection connection = dataSource.getConnection(); - PreparedStatement stmt = connection.prepareStatement(SELECT_PARTITIONS_STMT)) { - stmt.setString(1, eventType.getTable()); - stmt.execute(); - try (ResultSet resultSet = stmt.getResultSet()) { - while (resultSet.next()) { - String partitionTableName = resultSet.getString(1); - String partitionTsStr = partitionTableName.substring(eventType.getTable().length() + 1); - try { - partitions.add(Long.parseLong(partitionTsStr)); - } catch (NumberFormatException nfe) { - log.warn("Failed to parse table name: {}", partitionTableName); - } - } - } - } catch (SQLException e) { - log.error("SQLException occurred during events TTL task execution ", e); - } - return partitions; - } - - private boolean detachAndDropPartition(EventType eventType, long partitionTs) { - String tablePartition = eventType.getTable() + "_" + partitionTs; - String detachPsqlStmtStr = "ALTER TABLE " + eventType.getTable() + " DETACH PARTITION " + tablePartition; - if (getCurrentServerVersion() >= PSQL_VERSION_14) { - detachPsqlStmtStr += " CONCURRENTLY"; - } - - String dropStmtStr = "DROP TABLE " + tablePartition; - try (Connection connection = dataSource.getConnection(); - PreparedStatement detachStmt = connection.prepareStatement(detachPsqlStmtStr); - PreparedStatement dropStmt = connection.prepareStatement(dropStmtStr)) { - detachStmt.execute(); - dropStmt.execute(); - return true; - } catch (SQLException e) { - log.error("[{}] SQLException occurred during detach and drop of the partition: {}", eventType, partitionTs, e); - } - return false; - } - - private synchronized int getCurrentServerVersion() { - if (currentServerVersion == null) { - try (Connection connection = dataSource.getConnection(); - PreparedStatement versionStmt = connection.prepareStatement("SELECT current_setting('server_version_num')")) { - versionStmt.execute(); - try (ResultSet resultSet = versionStmt.getResultSet()) { - while (resultSet.next()) { - currentServerVersion = resultSet.getInt(1); - } - } - } catch (SQLException e) { - log.warn("SQLException occurred during fetch of the server version", e); - } - if (currentServerVersion == null) { - currentServerVersion = 0; - } - } - return currentServerVersion; + partitioningRepository.dropPartitionsBefore(eventType.getTable(), eventExpTime, partitionConfiguration.getPartitionSizeInMs(eventType)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 805e5e4f4f..988b03f7fd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -15,23 +15,147 @@ */ package org.thingsboard.server.dao.sqlts.insert.sql; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.hibernate.exception.ConstraintViolationException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Repository; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.timeseries.SqlPartition; -import org.thingsboard.server.dao.util.SqlTsDao; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; @Repository -@Transactional +@Slf4j public class SqlPartitioningRepository { - + // todo: check INSTALL , compare to events @PersistenceContext private EntityManager entityManager; + @Autowired + private JdbcTemplate jdbcTemplate; + + private static final String SELECT_PARTITIONS_STMT = "SELECT tablename from pg_tables WHERE schemaname = 'public' and tablename like concat(?, '_%')"; + + private static final int PSQL_VERSION_14 = 140000; + private volatile Integer currentServerVersion; + + private final Map> tablesPartitions = new ConcurrentHashMap<>(); + private final ReentrantLock partitionCreationLock = new ReentrantLock(); + + @Transactional public void save(SqlPartition partition) { entityManager.createNativeQuery(partition.getQuery()).executeUpdate(); } + @Transactional + public void createPartitionIfNotExists(String table, long entityTs, long partitionDurationMs) { + long partitionStartTs = calculatePartitionStartTime(entityTs, partitionDurationMs); + Map partitions = tablesPartitions.computeIfAbsent(table, t -> new ConcurrentHashMap<>()); + if (!partitions.containsKey(partitionStartTs)) { + SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs)); + partitionCreationLock.lock(); + try { + log.trace("Saving partition: {}", partition); + save(partition); + log.trace("Adding partition to map: {}", partition); + partitions.put(partition.getStart(), partition); + } catch (Exception ex) { // fixme: check + log.trace("Error occurred during partition save:", ex); +// if (ExceptionUtils.indexOfThrowable(ex, ConstraintViolationException.class) >= 0) { +// log.warn("Saving partition [{}] rejected. Data will be saved to the DEFAULT partition.", partition.getPartitionDate()); +// partitions.put(partition.getStart(), partition); +// } else { +// throw ex; +// } + } finally { + partitionCreationLock.unlock(); + } + } + } + + public void dropPartitionsBefore(String table, long ts, long partitionDurationMs) { + List partitions = fetchPartitions(table); + for (Long partitionStartTime : partitions) { + long partitionEndTime = partitionStartTime + partitionDurationMs; + if (partitionEndTime < ts) { + log.info("[{}] Detaching expired partition: [{}-{}]", table, partitionStartTime, partitionEndTime); + boolean success = detachAndDropPartition(table, partitionStartTime); + if (success) { + log.info("[{}] Detached expired partition: {}", table, partitionStartTime); + } + } else { + log.debug("[{}] Skipping valid partition: {}", table, partitionStartTime); + } + } + } + + public void cleanupPartitionsCache(String table, long expTime, long partitionDurationMs) { + Map partitions = tablesPartitions.get(table); + if (partitions == null) return; + partitions.keySet().removeIf(startTime -> (startTime + partitionDurationMs) < expTime); + } + + private boolean detachAndDropPartition(String table, long partitionTs) { + Map cachedPartitions = tablesPartitions.get(table); + if (cachedPartitions != null) cachedPartitions.remove(partitionTs); + + String tablePartition = table + "_" + partitionTs; + String detachPsqlStmtStr = "ALTER TABLE " + table + " DETACH PARTITION " + tablePartition; + if (getCurrentServerVersion() >= PSQL_VERSION_14) { + detachPsqlStmtStr += " CONCURRENTLY"; + } + + String dropStmtStr = "DROP TABLE " + tablePartition; + try { + jdbcTemplate.execute(detachPsqlStmtStr); + jdbcTemplate.execute(dropStmtStr); + return true; + } catch (DataAccessException e) { + log.error("[{}] Error occurred trying to detach and drop the partition {} ", table, partitionTs, e); + } + return false; + } + + private List fetchPartitions(String table) { + // todo: test + List partitions = new ArrayList<>(); + List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); + for (String partitionTableName : partitionsTables) { + String partitionTsStr = partitionTableName.substring(table.length() + 1); + try { + partitions.add(Long.parseLong(partitionTsStr)); + } catch (NumberFormatException nfe) { + log.warn("Failed to parse table name: {}", partitionTableName); + } + } + return partitions; + } + + private long calculatePartitionStartTime(long ts, long partitionDuration) { + return ts - (ts % partitionDuration); + } + + private synchronized int getCurrentServerVersion() { + if (currentServerVersion == null) { + try { + currentServerVersion = jdbcTemplate.queryForObject("SELECT current_setting('server_version_num')", Integer.class); + } catch (Exception e) { + log.warn("Error occurred during fetch of the server version", e); + } + if (currentServerVersion == null) { + currentServerVersion = 0; + } + } + return currentServerVersion; + } + } diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 80a5b03b92..34862e5af3 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -48,7 +48,7 @@ CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type); CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc); -CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time); +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); CREATE INDEX IF NOT EXISTS idx_rpc_tenant_id_device_id ON rpc(tenant_id, device_id); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index e93221508e..8892f4e3f0 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -89,7 +89,7 @@ CREATE TABLE IF NOT EXISTS asset ( ); CREATE TABLE IF NOT EXISTS audit_log ( - id uuid NOT NULL CONSTRAINT audit_log_pkey PRIMARY KEY, + id uuid NOT NULL, created_time bigint NOT NULL, tenant_id uuid, customer_id uuid, @@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS audit_log ( action_data varchar(1000000), action_status varchar(255), action_failure_details varchar(1000000) -); +) PARTITION BY RANGE (created_time); CREATE TABLE IF NOT EXISTS attribute_kv ( entity_type varchar(255), From c7c8ea0105c109e9a1d2076750838df3399c6a6d Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 10:45:43 +0300 Subject: [PATCH 05/15] Fix partition overlap error handling in SqlPartitioningRepository --- .../main/data/upgrade/3.4.1/schema_update.sql | 46 +++++++++++-------- application/src/main/resources/logback.xml | 1 - .../server/dao/sql/audit/JpaAuditLogDao.java | 5 +- .../insert/sql/SqlPartitioningRepository.java | 22 ++++----- 4 files changed, 37 insertions(+), 37 deletions(-) diff --git a/application/src/main/data/upgrade/3.4.1/schema_update.sql b/application/src/main/data/upgrade/3.4.1/schema_update.sql index 1b75a1c1e1..79cde8167f 100644 --- a/application/src/main/data/upgrade/3.4.1/schema_update.sql +++ b/application/src/main/data/upgrade/3.4.1/schema_update.sql @@ -14,7 +14,27 @@ -- limitations under the License. -- -CREATE TABLE IF NOT EXISTS tmp_audit_log ( +DO +$$ + DECLARE table_partition RECORD; + BEGIN + -- in case of running the upgrade script a second time: + IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_audit_log')) THEN + ALTER TABLE audit_log RENAME TO old_audit_log; + ALTER INDEX IF EXISTS idx_audit_log_tenant_id_and_created_time RENAME TO idx_old_audit_log_tenant_id_and_created_time; + + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'audit_log_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); + END LOOP; + ELSE + RAISE NOTICE 'Table old_audit_log already exists, leaving as is'; + END IF; + END; +$$; + +CREATE TABLE IF NOT EXISTS audit_log ( id uuid NOT NULL, created_time bigint NOT NULL, tenant_id uuid, @@ -29,21 +49,7 @@ CREATE TABLE IF NOT EXISTS tmp_audit_log ( action_status varchar(255), action_failure_details varchar(1000000) ) PARTITION BY RANGE (created_time); -CREATE INDEX IF NOT EXISTS idx_tmp_audit_log_tenant_id_and_created_time ON tmp_audit_log(tenant_id, created_time DESC); - -CREATE OR REPLACE PROCEDURE rename_old_audit_logs_partitions() - LANGUAGE plpgsql AS -$$ -DECLARE - table_partition RECORD; -BEGIN - FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts - FROM pg_tables WHERE tablename LIKE 'audit_log_%' - LOOP - EXECUTE format('ALTER TABLE %s RENAME TO old_audit_log_%s', table_partition.name, table_partition.partition_ts); - END LOOP; -END; -$$; +CREATE INDEX IF NOT EXISTS idx_audit_log_tenant_id_and_created_time ON audit_log(tenant_id, created_time DESC); CREATE OR REPLACE PROCEDURE migrate_audit_logs(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) LANGUAGE plpgsql AS @@ -52,17 +58,17 @@ DECLARE p RECORD; partition_end_ts BIGINT; BEGIN - FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM audit_log + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_audit_log WHERE created_time >= start_time_ms AND created_time < end_time_ms LOOP partition_end_ts = p.partition_ts + partition_size_ms; RAISE NOTICE '[audit_log] Partition to create : [%-%]', p.partition_ts, partition_end_ts; - EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF tmp_audit_log ' || + EXECUTE format('CREATE TABLE IF NOT EXISTS audit_log_%s PARTITION OF audit_log ' || 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); END LOOP; - INSERT INTO tmp_audit_log - SELECT * FROM audit_log + INSERT INTO audit_log + SELECT * FROM old_audit_log WHERE created_time >= start_time_ms AND created_time < end_time_ms; END; $$; diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 0bf081b35a..941bd7d278 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -28,7 +28,6 @@ - diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index a3ee6ea843..365dd82686 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -154,7 +154,6 @@ public class JpaAuditLogDao extends JpaAbstractDao imp throw new RuntimeException(error); } - jdbcTemplate.execute("CALL rename_old_audit_logs_partitions()"); while (startTime < currentTime) { var endTime = startTime + partitionStepInMs; log.info("Migrating audit logs for time period: {} - {}", startTime, endTime); @@ -163,9 +162,7 @@ public class JpaAuditLogDao extends JpaAbstractDao imp } log.info("Audit logs migration finished"); - jdbcTemplate.execute("DROP TABLE IF EXISTS audit_log"); - jdbcTemplate.execute("ALTER TABLE tmp_audit_log RENAME TO audit_log"); - jdbcTemplate.execute("ALTER INDEX IF EXISTS idx_tmp_audit_log_tenant_id_and_created_time RENAME TO idx_audit_log_tenant_id_and_created_time"); + jdbcTemplate.execute("DROP TABLE IF EXISTS old_audit_log"); } private void callMigrationFunction(long startTime, long endTime, long partitionSizeInMs) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 94097c590a..8083f17a5a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -17,7 +17,6 @@ package org.thingsboard.server.dao.sqlts.insert.sql; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcTemplate; @@ -68,17 +67,16 @@ public class SqlPartitioningRepository { save(partition); log.trace("Adding partition to map: {}", partition); partitions.put(partition.getStart(), partition); - } catch (Exception ex) { // fixme: check - log.trace("Error occurred during partition save:", ex); - // todo: if partitions накладаються, потестити (ConstraintViolationException) - // todo: SKIP_MIGRATION тільки для того щоб не переносити данні, таблицю треба створити partitioned. - // fixme: update script -// if (ExceptionUtils.indexOfThrowable(ex, ConstraintViolationException.class) >= 0) { -// log.warn("Saving partition [{}] rejected. Data will be saved to the DEFAULT partition.", partition.getPartitionDate()); -// partitions.put(partition.getStart(), partition); -// } else { -// throw ex; -// } + } catch (RuntimeException e) { + log.trace("Error occurred during partition save:", e); + String msg = ExceptionUtils.getRootCauseMessage(e); + if (msg.contains("would overlap partition")) { + log.warn("Couldn't save {} partition for {}, data will be saved to the default partition. SQL error: {}", + partition.getPartitionDate(), table, msg); + partitions.put(partition.getStart(), partition); + } else { + throw e; + } } finally { partitionCreationLock.unlock(); } From 9e2e5f809e413624b50bfabeae9b8c94fb90f7dd Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 11:03:10 +0300 Subject: [PATCH 06/15] Remove unneeded todos --- .../server/dao/sqlts/insert/sql/SqlPartitioningRepository.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 8083f17a5a..b194a666d2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -35,7 +35,7 @@ import java.util.concurrent.locks.ReentrantLock; @Repository @Slf4j public class SqlPartitioningRepository { - // todo: check INSTALL , compare to events + @PersistenceContext private EntityManager entityManager; @@ -127,7 +127,6 @@ public class SqlPartitioningRepository { } private List fetchPartitions(String table) { - // todo: test List partitions = new ArrayList<>(); List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); for (String partitionTableName : partitionsTables) { From 8694611f7a5a3fc07a1bd5c3440e58c6c4e075bc Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 11:19:38 +0300 Subject: [PATCH 07/15] Fix save method of JpaAuditLogDao --- .../server/dao/sql/audit/JpaAuditLogDao.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java index 365dd82686..1eefb21add 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/audit/JpaAuditLogDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.sql.audit; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.ListenableFuture; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -24,8 +25,10 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.AuditLogId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; @@ -72,12 +75,22 @@ public class JpaAuditLogDao extends JpaAbstractDao imp @Override public ListenableFuture saveByTenantId(AuditLog auditLog) { return service.submit(() -> { - partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); save(auditLog.getTenantId(), auditLog); return null; }); } + @Override + public AuditLog save(TenantId tenantId, AuditLog auditLog) { + if (auditLog.getId() == null) { + UUID uuid = Uuids.timeBased(); + auditLog.setId(new AuditLogId(uuid)); + auditLog.setCreatedTime(Uuids.unixTimestamp(uuid)); + } + partitioningRepository.createPartitionIfNotExists(TABLE_NAME, auditLog.getCreatedTime(), TimeUnit.HOURS.toMillis(partitionSizeInHours)); + return super.save(tenantId, auditLog); + } + @Override public PageData findAuditLogsByTenantIdAndEntityId(UUID tenantId, EntityId entityId, List actionTypes, TimePageLink pageLink) { return DaoUtil.toPageData( From 7eea5a6cda7a6922f8874b31d68c1eafd60a0f03 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 13:07:21 +0300 Subject: [PATCH 08/15] Minor refactoring --- .../service/install/update/DefaultDataUpdateService.java | 3 +++ .../server/service/ttl/AuditLogsCleanUpService.java | 4 ++-- application/src/main/resources/thingsboard.yml | 2 +- .../dao/sqlts/insert/sql/SqlPartitioningRepository.java | 1 + 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index b307489de6..a8cf5374a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -184,7 +184,10 @@ public class DefaultDataUpdateService implements DataUpdateService { boolean skipAuditLogsMigration = getEnv("TB_SKIP_AUDIT_LOGS_MIGRATION", false); if (!skipAuditLogsMigration) { log.info("Updating data from version 3.4.1 to 3.4.2 ..."); + log.info("Starting audit logs migration. Can be skipped with TB_SKIP_AUDIT_LOGS_MIGRATION env variable set to true"); auditLogDao.migrateAuditLogs(); + } else { + log.info("Skipping audit logs migration"); } break; default: diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java index 9df77be0d2..54c4f6523e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -49,8 +49,8 @@ public class AuditLogsCleanUpService extends AbstractCleanUpService { this.partitioningRepository = partitioningRepository; } - @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.execution_interval_ms})}", - fixedDelayString = "${sql.ttl.audit_logs.execution_interval_ms}") + @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.audit_logs.checking_interval_ms})}", + fixedDelayString = "${sql.ttl.audit_logs.checking_interval_ms}") public void cleanUp() { long auditLogsExpTime = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(ttlInSec); if (isSystemTenantPartitionMine()) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 96438470a0..cf13376946 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -308,7 +308,7 @@ sql: audit_logs: enabled: "${SQL_TTL_AUDIT_LOGS_ENABLED:true}" ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size - execution_interval_ms: "${SQL_TTL_AUDIT_LOGS_EXECUTION_INTERVAL_MS:86400000}" # Default value - 1 day + checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day relations: max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index b194a666d2..67b451ec2a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -63,6 +63,7 @@ public class SqlPartitioningRepository { SqlPartition partition = new SqlPartition(table, partitionStartTs, partitionStartTs + partitionDurationMs, Long.toString(partitionStartTs)); partitionCreationLock.lock(); try { + if (partitions.containsKey(partitionStartTs)) return; log.trace("Saving partition: {}", partition); save(partition); log.trace("Adding partition to map: {}", partition); From b4300b35bfae3298237ec577b364c3fea1102672 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 4 Oct 2022 15:03:05 +0300 Subject: [PATCH 09/15] Add tests for audit logs partitions --- .../BaseAuditLogControllerTest.java | 68 +++++++++++++++++++ .../resources/application-test.properties | 5 +- .../insert/sql/SqlPartitioningRepository.java | 4 +- 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java index 5369d4d30a..c46f7d9c46 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseAuditLogControllerTest.java @@ -20,18 +20,33 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.Authority; +import org.thingsboard.server.dao.audit.AuditLogDao; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.sqlts.insert.sql.SqlPartitioningRepository; +import org.thingsboard.server.service.ttl.AuditLogsCleanUpService; +import java.time.LocalDate; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; public abstract class BaseAuditLogControllerTest extends AbstractControllerTest { @@ -39,6 +54,18 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest private Tenant savedTenant; private User tenantAdmin; + @Autowired + private AuditLogDao auditLogDao; + @SpyBean + private SqlPartitioningRepository partitioningRepository; + @Autowired + private AuditLogsCleanUpService auditLogsCleanUpService; + + @Value("#{${sql.audit_logs.partition_size} * 60 * 60 * 1000}") + private long partitionDurationInMs; + @Value("${sql.ttl.audit_logs.ttl}") + private long auditLogsTtlInSec; + @Before public void beforeTest() throws Exception { loginSysAdmin(); @@ -145,4 +172,45 @@ public abstract class BaseAuditLogControllerTest extends AbstractControllerTest Assert.assertEquals(179, loadedAuditLogs.size()); } + + @Test + public void whenSavingNewAuditLog_thenCheckAndCreatePartitionIfNotExists() { + reset(partitioningRepository); + AuditLog auditLog = createAuditLog(ActionType.LOGIN, tenantAdminUserId); + verify(partitioningRepository).createPartitionIfNotExists(eq("audit_log"), eq(auditLog.getCreatedTime()), eq(partitionDurationInMs)); + + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).singleElement().satisfies(partitionStartTs -> { + assertThat(partitionStartTs).isEqualTo(partitioningRepository.calculatePartitionStartTime(auditLog.getCreatedTime(), partitionDurationInMs)); + }); + } + + @Test + public void whenCleaningUpAuditLogsByTtl_thenDropOldPartitions() { + long oldAuditLogTs = LocalDate.of(2020, 10, 1).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(); + long partitionStartTs = partitioningRepository.calculatePartitionStartTime(oldAuditLogTs, partitionDurationInMs); + partitioningRepository.createPartitionIfNotExists("audit_log", oldAuditLogTs, partitionDurationInMs); + List partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).contains(partitionStartTs); + + auditLogsCleanUpService.cleanUp(); + partitions = partitioningRepository.fetchPartitions("audit_log"); + assertThat(partitions).doesNotContain(partitionStartTs); + assertThat(partitions).allSatisfy(partitionsStart -> { + long partitionEndTs = partitionsStart + partitionDurationInMs; + assertThat(partitionEndTs).isGreaterThan(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(auditLogsTtlInSec)); + }); + } + + private AuditLog createAuditLog(ActionType actionType, EntityId entityId) { + AuditLog auditLog = new AuditLog(); + auditLog.setTenantId(tenantId); + auditLog.setCustomerId(null); + auditLog.setUserId(tenantAdminUserId); + auditLog.setEntityId(entityId); + auditLog.setUserName(tenantAdmin.getEmail()); + auditLog.setActionType(actionType); + return auditLogDao.save(tenantId, auditLog); + } + } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index 279d1e99be..eacd1733d3 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -56,4 +56,7 @@ queue.rule-engine.queues[2].processing-strategy.retries=1 queue.rule-engine.queues[2].processing-strategy.pause-between-retries=0 queue.rule-engine.queues[2].processing-strategy.max-pause-between-retries=0 -usage.stats.report.enabled=false \ No newline at end of file +usage.stats.report.enabled=false + +sql.audit_logs.partition_size=24 +sql.ttl.audit_logs.ttl=2592000 \ No newline at end of file diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 67b451ec2a..9b58358e3f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -127,7 +127,7 @@ public class SqlPartitioningRepository { return false; } - private List fetchPartitions(String table) { + public List fetchPartitions(String table) { List partitions = new ArrayList<>(); List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); for (String partitionTableName : partitionsTables) { @@ -141,7 +141,7 @@ public class SqlPartitioningRepository { return partitions; } - private long calculatePartitionStartTime(long ts, long partitionDuration) { + public long calculatePartitionStartTime(long ts, long partitionDuration) { return ts - (ts % partitionDuration); } From d86c20b3545aeca1a4d14ff7fd210f0b198c055c Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 6 Oct 2022 11:46:24 +0300 Subject: [PATCH 10/15] Additional validation for AuditLog entity --- .../org/thingsboard/server/common/data/audit/AuditLog.java | 3 +++ .../thingsboard/server/dao/audit/AuditLogServiceImpl.java | 6 +++++- .../org/thingsboard/server/dao/service/DataValidator.java | 2 +- .../thingsboard/server/dao/service/NoXssValidatorTest.java | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java index e87a1a6380..f88aabaa64 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java @@ -22,6 +22,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import org.thingsboard.server.common.data.BaseData; import org.thingsboard.server.common.data.id.*; +import org.thingsboard.server.common.data.validation.NoXss; @ApiModel @EqualsAndHashCode(callSuper = true) @@ -34,10 +35,12 @@ public class AuditLog extends BaseData { private CustomerId customerId; @ApiModelProperty(position = 5, value = "JSON object with Entity id", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private EntityId entityId; + @NoXss @ApiModelProperty(position = 6, value = "Name of the logged entity", example = "Thermometer", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private String entityName; @ApiModelProperty(position = 7, value = "JSON object with User id.", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private UserId userId; + @NoXss @ApiModelProperty(position = 8, value = "Unique user name(email) of the user that performed some action on logged entity", example = "tenant@thingsboard.org", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private String userName; @ApiModelProperty(position = 9, value = "String represented Action type", example = "ADDED", accessMode = ApiModelProperty.AccessMode.READ_ONLY) diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index e3ea775b0e..24e7ef4c7f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -382,7 +382,11 @@ public class AuditLogServiceImpl implements AuditLogService { AuditLog auditLogEntry = createAuditLogEntry(tenantId, entityId, entityName, customerId, userId, userName, actionType, actionData, actionStatus, actionFailureDetails); log.trace("Executing logAction [{}]", auditLogEntry); - auditLogValidator.validate(auditLogEntry, AuditLog::getTenantId); + try { + auditLogValidator.validate(auditLogEntry, AuditLog::getTenantId); + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); futures.add(auditLogDao.saveByTenantId(auditLogEntry)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/DataValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/DataValidator.java index 28223f3e7f..beb4a7ebc1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/DataValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/DataValidator.java @@ -62,7 +62,7 @@ public abstract class DataValidator> { } return old; } catch (DataValidationException e) { - log.error("Data object is invalid: [{}]", e.getMessage()); + log.error("{} object is invalid: [{}]", data == null ? "Data" : data.getClass().getSimpleName(), e.getMessage()); throw e; } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java index c4ef964883..36eb76edb5 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java @@ -42,7 +42,7 @@ public class NoXssValidatorTest { "

Link!!!

1221", "

Please log in to proceed

Username:

Password:



", " ", - "123 bebe", + "123 bebe" }) public void testIsNotValid(String stringWithXss) { boolean isValid = validator.isValid(stringWithXss, mock(ConstraintValidatorContext.class)); From caea6aa27b728f5f31687f149869d03546301bbb Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 6 Oct 2022 13:48:06 +0300 Subject: [PATCH 11/15] Save audit log with entity name replaced if it is malformed --- .../thingsboard/server/common/data/audit/AuditLog.java | 1 - .../thingsboard/server/dao/audit/AuditLogServiceImpl.java | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java index f88aabaa64..b921ea4374 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/AuditLog.java @@ -40,7 +40,6 @@ public class AuditLog extends BaseData { private String entityName; @ApiModelProperty(position = 7, value = "JSON object with User id.", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private UserId userId; - @NoXss @ApiModelProperty(position = 8, value = "Unique user name(email) of the user that performed some action on logged entity", example = "tenant@thingsboard.org", accessMode = ApiModelProperty.AccessMode.READ_ONLY) private String userName; @ApiModelProperty(position = 9, value = "String represented Action type", example = "ADDED", accessMode = ApiModelProperty.AccessMode.READ_ONLY) diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index 24e7ef4c7f..da94141768 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -23,13 +23,13 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.audit.ActionStatus; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.AuditLog; @@ -385,7 +385,11 @@ public class AuditLogServiceImpl implements AuditLogService { try { auditLogValidator.validate(auditLogEntry, AuditLog::getTenantId); } catch (Exception e) { - return Futures.immediateFailedFuture(e); + if (StringUtils.contains(e.getMessage(), "value is malformed")) { + auditLogEntry.setEntityName("MALFORMED"); + } else { + return Futures.immediateFailedFuture(e); + } } List> futures = Lists.newArrayListWithExpectedSize(INSERTS_PER_ENTRY); futures.add(auditLogDao.saveByTenantId(auditLogEntry)); From cdd55ed170eff51ab4c6a195a4137a25af3c2a7f Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 11 Oct 2022 09:54:53 +0200 Subject: [PATCH 12/15] Update package update version for tb docker images --- msa/tb/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/msa/tb/pom.xml b/msa/tb/pom.xml index 30358d97cb..f204a6822f 100644 --- a/msa/tb/pom.xml +++ b/msa/tb/pom.xml @@ -38,7 +38,7 @@ tb-postgres tb-cassandra /usr/share/${pkg.name} - 3.3.3 + 3.4.2 From d2f67d33d176785f4d7414920983ae75e0469a58 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 12 Oct 2022 10:12:29 +0300 Subject: [PATCH 13/15] Minor refactoring for AuditLogsCleanUpService and SqlPartitioningRepository --- .../server/service/ttl/AuditLogsCleanUpService.java | 4 +--- .../dao/sqlts/insert/sql/SqlPartitioningRepository.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java index 54c4f6523e..11b1751c0d 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/AuditLogsCleanUpService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.ttl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.server.dao.audit.AuditLogDao; @@ -30,8 +29,7 @@ import java.util.concurrent.TimeUnit; import static org.thingsboard.server.dao.model.ModelConstants.AUDIT_LOG_COLUMN_FAMILY_NAME; @Service -@ConditionalOnProperty(name = "sql.ttl.audit_logs.enabled", havingValue = "true") -@ConditionalOnExpression("${sql.ttl.audit_logs.ttl:0} > 0") +@ConditionalOnExpression("${sql.ttl.audit_logs.enabled:true} && ${sql.ttl.audit_logs.ttl:0} > 0") @Slf4j public class AuditLogsCleanUpService extends AbstractCleanUpService { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java index 9b58358e3f..87a62d356f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/sql/SqlPartitioningRepository.java @@ -129,7 +129,7 @@ public class SqlPartitioningRepository { public List fetchPartitions(String table) { List partitions = new ArrayList<>(); - List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, new Object[]{table}, String.class); + List partitionsTables = jdbcTemplate.queryForList(SELECT_PARTITIONS_STMT, String.class, table); for (String partitionTableName : partitionsTables) { String partitionTsStr = partitionTableName.substring(table.length() + 1); try { From 2ed85cb2941ed8ed24874a70fd5830638bc133b0 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 12 Oct 2022 12:21:18 +0300 Subject: [PATCH 14/15] Tell edge events with normal priority. Updated sync request timeout to 20 seconds --- .../thingsboard/server/service/edge/rpc/EdgeGrpcService.java | 2 +- .../server/service/queue/DefaultTbCoreConsumerService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 70abad7ba6..4cb6fcdb0a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -296,7 +296,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.trace("[{}] timeout for processing sync edge request.", requestId); consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false)); } - }, 10, TimeUnit.SECONDS); + }, 20, TimeUnit.SECONDS); } private void processSyncResponse(FromEdgeSyncResponse response) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 745ffa7bab..1162934039 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -551,7 +551,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tellWithHighPriority(actorMsg.get()); + actorContext.tell(actorMsg.get()); } callback.onSuccess(); } From 35288bbfe8cf041bcdb04789dd638589b5a5d2e8 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Wed, 12 Oct 2022 12:27:22 +0300 Subject: [PATCH 15/15] Removed required=false. Renamed test name --- .../server/service/edge/rpc/processor/BaseEdgeProcessor.java | 2 +- .../src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 2be5a5684d..40d5c8d404 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -93,7 +93,7 @@ public abstract class BaseEdgeProcessor { @Autowired protected TelemetrySubscriptionService tsSubService; - @Autowired(required = false) + @Autowired protected TbNotificationEntityService notificationEntityService; @Autowired diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 34f71a9ffa..180a345633 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -2152,7 +2152,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { } @Test - public void updateSharedAttributeOnCloudAndValidateDeviceSubscription() throws Exception { + public void sendUpdateSharedAttributeToCloudAndValidateDeviceSubscription() throws Exception { Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); DeviceCredentials deviceCredentials = doGet("/api/device/" + device.getUuidId() + "/credentials", DeviceCredentials.class);