From 3ae5acbcc8eaa47a9459256b994a60d3626a48c2 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 22 Jul 2021 15:59:47 +0300 Subject: [PATCH 01/10] Added lock on edge event save/read. Removed save async method --- .../device/DeviceActorMessageProcessor.java | 16 +-- .../edge/DefaultEdgeNotificationService.java | 19 +-- .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../edge/rpc/processor/BaseEdgeProcessor.java | 21 +--- .../rpc/processor/DeviceEdgeProcessor.java | 15 +-- .../rpc/sync/DefaultEdgeRequestsService.java | 116 +++--------------- .../thingsboard/server/edge/BaseEdgeTest.java | 12 +- .../server/dao/edge/EdgeEventService.java | 3 +- .../server/dao/edge/BaseEdgeEventService.java | 6 +- .../server/dao/edge/EdgeEventDao.java | 4 +- .../dao/sql/edge/JpaBaseEdgeEventDao.java | 86 ++++++++----- .../dao/service/BaseEdgeEventServiceTest.java | 6 +- .../rule/engine/edge/TbMsgPushToEdgeNode.java | 17 +-- 13 files changed, 97 insertions(+), 226 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 30eb24a4d2..5ea82c8c55 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -94,11 +94,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Consumer; @@ -703,18 +701,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { edgeEvent.setBody(body); edgeEvent.setEdgeId(edgeId); - ListenableFuture future = systemContext.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(EdgeEvent result) { - systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t); - } - }, systemContext.getDbCallbackExecutor()); + systemContext.getEdgeEventService().save(edgeEvent); + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); } private List toTsKvProtos(@Nullable List result) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index c861207f2d..4fd1405555 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -16,11 +16,7 @@ package org.thingsboard.server.service.edge; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.edge.Edge; @@ -123,19 +119,8 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { edgeEvent.setEntityId(entityId.getId()); } edgeEvent.setBody(body); - ListenableFuture future = edgeEventService.saveAsync(edgeEvent); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - clusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t); - } - }, dbCallbackExecutorService); - + edgeEventService.save(edgeEvent); + clusterService.onEdgeEventUpdate(tenantId, edgeId); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 32e23fddad..bec187a69b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -259,7 +259,7 @@ public final class EdgeGrpcSession implements Closeable { log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg()); } if (sessionState.getPendingMsgsMap().isEmpty()) { - log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg); + log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey()); if (sessionState.getScheduledSendDownlinkTask() != null) { sessionState.getScheduledSendDownlinkTask().cancel(false); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index f32576499e..b2cf5d4c39 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -17,11 +17,7 @@ package org.thingsboard.server.service.edge.rpc.processor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.HasCustomerId; import org.thingsboard.server.common.data.edge.Edge; @@ -178,7 +174,7 @@ public abstract class BaseEdgeProcessor { @Autowired protected DbCallbackExecutorService dbCallbackExecutorService; - protected ListenableFuture saveEdgeEvent(TenantId tenantId, + protected void saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, @@ -197,19 +193,8 @@ public abstract class BaseEdgeProcessor { edgeEvent.setEntityId(entityId.getId()); } edgeEvent.setBody(body); - ListenableFuture future = edgeEventService.saveAsync(edgeEvent); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - tbClusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t); - } - }, dbCallbackExecutorService); - return future; + edgeEventService.save(edgeEvent); + tbClusterService.onEdgeEventUpdate(tenantId, edgeId); } protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index 3e2c740acc..d924795df1 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -105,19 +105,8 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { Device newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName); ObjectNode body = mapper.createObjectNode(); body.put("conflictName", deviceName); - ListenableFuture future = - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(EdgeEvent edgeEvent) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null); - } - - @Override - public void onFailure(Throwable t) { - log.error("[{}] Failed to save ENTITY_MERGE_REQUEST edge event [{}][{}]", tenantId, deviceUpdateMsg, edge.getId(), t); - } - }, dbCallbackExecutorService); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null); } } while (pageData != null && pageData.hasNext()); } else { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index a1b686cd1a..33d6c65d05 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -122,26 +122,13 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public ListenableFuture processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) { log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg); - SettableFuture futureToSet = SettableFuture.create(); if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't save edge event [{}]", ruleChainMetadataRequestMsg, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); } - return futureToSet; + return Futures.immediateFuture(null); } @Override @@ -273,82 +260,39 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public ListenableFuture processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg) { log.trace("[{}] processDeviceCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), deviceCredentialsRequestMsg); - SettableFuture futureToSet = SettableFuture.create(); if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) { DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't save edge event [{}]", deviceCredentialsRequestMsg, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); } - return futureToSet; + return Futures.immediateFuture(null); } @Override public ListenableFuture processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg) { log.trace("[{}] processUserCredentialsRequestMsg [{}][{}]", tenantId, edge.getName(), userCredentialsRequestMsg); - SettableFuture futureToSet = SettableFuture.create(); if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) { UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.CREDENTIALS_UPDATED, userId, null); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't save edge event [{}]", userCredentialsRequestMsg, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); } - return futureToSet; + return Futures.immediateFuture(null); } @Override public ListenableFuture processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) { log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg); - SettableFuture futureToSet = SettableFuture.create(); if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB())); DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); - List> futures; if (deviceProfileById != null) { - futures = syncDevices(tenantId, edge, deviceProfileById.getName()); - } else { - futures = new ArrayList<>(); + syncDevices(tenantId, edge, deviceProfileById.getName()); } - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't sync devices by device profile [{}]", deviceProfileDevicesRequestMsg, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); } - return futureToSet; + return Futures.immediateFuture(null); } - private List> syncDevices(TenantId tenantId, Edge edge, String deviceType) { - List> futures = new ArrayList<>(); + private void syncDevices(TenantId tenantId, Edge edge, String deviceType) { log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType); try { PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); @@ -358,7 +302,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); for (Device device : pageData.getData()) { - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null)); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null); } if (pageData.hasNext()) { pageLink = pageLink.nextPageLink(); @@ -368,40 +312,25 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } catch (Exception e) { log.error("Exception during loading edge device(s) on sync!", e); } - return futures; } @Override public ListenableFuture processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) { log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg); - SettableFuture futureToSet = SettableFuture.create(); if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) { WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB())); WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId); - List> futures = new ArrayList<>(); if (widgetsBundleById != null) { List widgetTypesToPush = widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias()); for (WidgetType widgetType : widgetTypesToPush) { - futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null)); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null); } } - Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable List result) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't sync widget types by widget bundle [{}]", widgetBundleTypesRequestMsg, t); - futureToSet.setException(t); - } - }, dbCallbackExecutorService); } - return futureToSet; + return Futures.immediateFuture(null); } @Override @@ -425,6 +354,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null); } + futureToSet.set(null); } @Override @@ -434,8 +364,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } }, dbCallbackExecutorService); } + } else { + futureToSet.set(null); } - futureToSet.set(null); } catch (Exception e) { log.error("Exception during loading relation(s) to edge on sync!", e); futureToSet.setException(e); @@ -451,7 +382,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { return futureToSet; } - private ListenableFuture saveEdgeEvent(TenantId tenantId, + private void saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, EdgeEventActionType action, @@ -462,19 +393,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); - ListenableFuture future = edgeEventService.saveAsync(edgeEvent); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable EdgeEvent result) { - tbClusterService.onEdgeEventUpdate(tenantId, edgeId); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Can't save edge event [{}] for edge [{}]", tenantId.getId(), edgeEvent, edgeId.getId(), t); - } - }, dbCallbackExecutorService); - return future; + edgeEventService.save(edgeEvent); + tbClusterService.onEdgeEventUpdate(tenantId, edgeId); } } diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index becdfabb32..dd9f67416a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -939,7 +939,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { String timeseriesData = "{\"data\":{\"temperature\":25},\"ts\":" + System.currentTimeMillis() + "}"; JsonNode timeseriesEntityData = mapper.readTree(timeseriesData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData); - edgeEventService.saveAsync(edgeEvent); + edgeEventService.save(edgeEvent); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -978,7 +978,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { JsonNode attributesEntityData = mapper.readTree(attributesData); EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData); edgeImitator.expectMessageAmount(1); - edgeEventService.saveAsync(edgeEvent1); + edgeEventService.save(edgeEvent1); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -1003,7 +1003,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { JsonNode postAttributesEntityData = mapper.readTree(postAttributesData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData); edgeImitator.expectMessageAmount(1); - edgeEventService.saveAsync(edgeEvent); + edgeEventService.save(edgeEvent); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -1028,7 +1028,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { JsonNode deleteAttributesEntityData = mapper.readTree(deleteAttributesData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData); edgeImitator.expectMessageAmount(1); - edgeEventService.saveAsync(edgeEvent); + edgeEventService.save(edgeEvent); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -1062,7 +1062,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body); edgeImitator.expectMessageAmount(1); - edgeEventService.saveAsync(edgeEvent); + edgeEventService.save(edgeEvent); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -1088,7 +1088,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { JsonNode timeseriesEntityData = mapper.readTree(timeseriesData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData); - edgeEventService.saveAsync(edgeEvent); + edgeEventService.save(edgeEvent); clusterService.onEdgeEventUpdate(tenantId, edge.getId()); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java index d70ddd1184..f98b461694 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.dao.edge; -import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; @@ -24,7 +23,7 @@ import org.thingsboard.server.common.data.page.TimePageLink; public interface EdgeEventService { - ListenableFuture saveAsync(EdgeEvent edgeEvent); + EdgeEvent save(EdgeEvent edgeEvent); PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate); diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java index b9a2e46f37..8d012f63a4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.dao.edge; -import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.edge.EdgeEvent; @@ -36,9 +34,9 @@ public class BaseEdgeEventService implements EdgeEventService { private EdgeEventDao edgeEventDao; @Override - public ListenableFuture saveAsync(EdgeEvent edgeEvent) { + public EdgeEvent save(EdgeEvent edgeEvent) { edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId); - return edgeEventDao.saveAsync(edgeEvent); + return edgeEventDao.save(edgeEvent); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java index 804703be76..92e0685105 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java @@ -30,12 +30,12 @@ import java.util.UUID; public interface EdgeEventDao extends Dao { /** - * Save or update edge event object async + * Save or update edge event object * * @param edgeEvent the event object * @return saved edge event object future */ - ListenableFuture saveAsync(EdgeEvent edgeEvent); + EdgeEvent save(EdgeEvent edgeEvent); /** diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index fc4250044d..050d0181f0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -41,6 +41,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; @@ -50,6 +54,8 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao readWriteLocks = new ConcurrentHashMap<>(); + @Autowired private EdgeEventRepository edgeEventRepository; @@ -64,47 +70,59 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao saveAsync(EdgeEvent edgeEvent) { - log.debug("Save edge event [{}] ", edgeEvent); - if (edgeEvent.getId() == null) { - UUID timeBased = Uuids.timeBased(); - edgeEvent.setId(new EdgeEventId(timeBased)); - edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased)); - } else if (edgeEvent.getCreatedTime() == 0L) { - UUID eventId = edgeEvent.getId().getId(); - if (eventId.version() == 1) { - edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId)); - } else { - edgeEvent.setCreatedTime(System.currentTimeMillis()); + public EdgeEvent save(EdgeEvent edgeEvent) { + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeEvent.getEdgeId(), id -> new ReentrantLock()); + readWriteLock.lock(); + try { + log.debug("Save edge event [{}] ", edgeEvent); + if (edgeEvent.getId() == null) { + UUID timeBased = Uuids.timeBased(); + edgeEvent.setId(new EdgeEventId(timeBased)); + edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased)); + } else if (edgeEvent.getCreatedTime() == 0L) { + UUID eventId = edgeEvent.getId().getId(); + if (eventId.version() == 1) { + edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId)); + } else { + edgeEvent.setCreatedTime(System.currentTimeMillis()); + } } + if (StringUtils.isEmpty(edgeEvent.getUid())) { + edgeEvent.setUid(edgeEvent.getId().toString()); + } + return save(new EdgeEventEntity(edgeEvent)).orElse(null); + } finally { + readWriteLock.unlock(); } - if (StringUtils.isEmpty(edgeEvent.getUid())) { - edgeEvent.setUid(edgeEvent.getId().toString()); - } - return service.submit(() -> save(new EdgeEventEntity(edgeEvent)).orElse(null)); } @Override public PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) { - if (withTsUpdate) { - return DaoUtil.toPageData( - edgeEventRepository - .findEdgeEventsByTenantIdAndEdgeId( - tenantId, - edgeId.getId(), - pageLink.getStartTime(), - pageLink.getEndTime(), - DaoUtil.toPageable(pageLink))); - } else { - return DaoUtil.toPageData( - edgeEventRepository - .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated( - tenantId, - edgeId.getId(), - pageLink.getStartTime(), - pageLink.getEndTime(), - DaoUtil.toPageable(pageLink))); + final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeId, id -> new ReentrantLock()); + readWriteLock.lock(); + try { + if (withTsUpdate) { + return DaoUtil.toPageData( + edgeEventRepository + .findEdgeEventsByTenantIdAndEdgeId( + tenantId, + edgeId.getId(), + pageLink.getStartTime(), + pageLink.getEndTime(), + DaoUtil.toPageable(pageLink))); + } else { + return DaoUtil.toPageData( + edgeEventRepository + .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated( + tenantId, + edgeId.getId(), + pageLink.getStartTime(), + pageLink.getEndTime(), + DaoUtil.toPageable(pageLink))); + } + } finally { + readWriteLock.unlock(); } } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseEdgeEventServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseEdgeEventServiceTest.java index a7a67a8f4b..7c86e38eec 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseEdgeEventServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseEdgeEventServiceTest.java @@ -42,7 +42,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest { EdgeId edgeId = new EdgeId(Uuids.timeBased()); DeviceId deviceId = new DeviceId(Uuids.timeBased()); EdgeEvent edgeEvent = generateEdgeEvent(null, edgeId, deviceId, EdgeEventActionType.ADDED); - EdgeEvent saved = edgeEventService.saveAsync(edgeEvent).get(); + EdgeEvent saved = edgeEventService.save(edgeEvent); Assert.assertEquals(saved.getTenantId(), edgeEvent.getTenantId()); Assert.assertEquals(saved.getEdgeId(), edgeEvent.getEdgeId()); Assert.assertEquals(saved.getEntityId(), edgeEvent.getEntityId()); @@ -109,7 +109,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest { TimePageLink pageLink = new TimePageLink(1); EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED); - edgeEventService.saveAsync(edgeEventWithTsUpdate).get(); + edgeEventService.save(edgeEventWithTsUpdate); PageData allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true); PageData edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false); @@ -124,6 +124,6 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest { private EdgeEvent saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception { EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED); edgeEvent.setId(new EdgeEventId(Uuids.startOf(time))); - return edgeEventService.saveAsync(edgeEvent).get(); + return edgeEventService.save(edgeEvent); } } \ No newline at end of file diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index 911862c5a5..7a2cb4bda2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -149,20 +149,9 @@ public class TbMsgPushToEdgeNode implements TbNode { private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) { edgeEvent.setEdgeId(edgeId); - ListenableFuture saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent); - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable EdgeEvent event) { - ctx.tellNext(msg, SUCCESS); - ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); - } - - @Override - public void onFailure(Throwable th) { - log.warn("[{}] Can't save edge event [{}] for edge [{}]", ctx.getTenantId().getId(), edgeEvent, edgeId.getId(), th); - ctx.tellFailure(msg, th); - } - }, ctx.getDbCallbackExecutor()); + ctx.getEdgeEventService().save(edgeEvent); + ctx.tellNext(msg, SUCCESS); + ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); } private EdgeEvent buildEdgeEvent(TbMsg msg, TbContext ctx) { From 687978f9f1aab0c97d92cc7f69882e7d044164a9 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 22 Jul 2021 16:05:31 +0300 Subject: [PATCH 02/10] Formatting --- .../service/edge/rpc/processor/BaseEdgeProcessor.java | 10 +++++----- .../edge/rpc/sync/DefaultEdgeRequestsService.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index b2cf5d4c39..e213313228 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -175,11 +175,11 @@ public abstract class BaseEdgeProcessor { protected DbCallbackExecutorService dbCallbackExecutorService; protected void saveEdgeEvent(TenantId tenantId, - EdgeId edgeId, - EdgeEventType type, - EdgeEventActionType action, - EntityId entityId, - JsonNode body) { + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body) { log.debug("Pushing event to edge queue. tenantId [{}], edgeId [{}], type[{}], " + "action [{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 33d6c65d05..7518291e38 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -383,11 +383,11 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { } private void saveEdgeEvent(TenantId tenantId, - EdgeId edgeId, - EdgeEventType type, - EdgeEventActionType action, - EntityId entityId, - JsonNode body) { + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body) { log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); From c7386df829c8c50cd4ebdecafc04247de7cb4d2d Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 22 Jul 2021 16:15:45 +0300 Subject: [PATCH 03/10] Improved stability of handling errors in processEntityViewsRequestMsg function --- .../rpc/sync/DefaultEdgeRequestsService.java | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 7518291e38..d7971e78be 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -141,8 +141,8 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { if (type != null) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributesRequestMsg.getScope(); - ListenableFuture> ssAttrFuture = attributesService.findAll(tenantId, entityId, scope); - Futures.addCallback(ssAttrFuture, new FutureCallback>() { + ListenableFuture> findAttrFuture = attributesService.findAll(tenantId, entityId, scope); + Futures.addCallback(findAttrFuture, new FutureCallback>() { @Override public void onSuccess(@Nullable List ssAttributes) { if (ssAttributes != null && !ssAttributes.isEmpty()) { @@ -171,8 +171,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { entityId, body); } catch (Exception e) { - log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e); - throw new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e); + log.error("[{}] Failed to save attribute updates to the edge", edge.getName(), e); + futureToSet.setException(new RuntimeException("[" + edge.getName() + "] Failed to send attribute updates to the edge", e)); + return; } } else { log.trace("[{}][{}] No attributes found for entity {} [{}]", tenantId, @@ -185,7 +186,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { @Override public void onFailure(Throwable t) { - log.error("Can't save attributes [{}]", attributesRequestMsg, t); + log.error("Can't find attributes [{}]", attributesRequestMsg, t); futureToSet.setException(t); } }, dbCallbackExecutorService); @@ -345,25 +346,37 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { public void onSuccess(@Nullable List entityViews) { try { if (entityViews != null && !entityViews.isEmpty()) { + List> futures = new ArrayList<>(); for (EntityView entityView : entityViews) { - Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), - EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() { + ListenableFuture future = relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), + EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); + futures.add(future); + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable Boolean result) { if (Boolean.TRUE.equals(result)) { saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null); } - futureToSet.set(null); } - @Override public void onFailure(Throwable t) { - log.error("Exception during loading relation [{}] to edge on sync!", t, t); - futureToSet.setException(t); + // Do nothing - error handles in allAsList } }, dbCallbackExecutorService); } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Exception during loading relation [{}] to edge on sync!", t, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); } else { futureToSet.set(null); } From 339097e606c75f2150498ce8376c4f722bcb935f Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 11:14:41 +0300 Subject: [PATCH 04/10] Removed unused class field --- .../server/service/edge/DefaultEdgeNotificationService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 4fd1405555..f45fd80068 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -37,7 +37,6 @@ import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.EntityEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor; -import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.queue.TbClusterService; import javax.annotation.PostConstruct; @@ -61,9 +60,6 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @Autowired private TbClusterService clusterService; - @Autowired - private DbCallbackExecutorService dbCallbackExecutorService; - @Autowired private EdgeProcessor edgeProcessor; From 7212617cf02bbfbea35fde03b15a0029223114b4 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 11:29:13 +0300 Subject: [PATCH 05/10] Temporary disabled unstable test --- .../server/dao/service/BaseDashboardServiceTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index 86bad0b1a0..6e1d8e66d8 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; @@ -208,6 +209,7 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } @Test + @Ignore public void testFindMobileDashboardsByTenantId() { Tenant tenant = new Tenant(); tenant.setTitle("Test tenant"); From 72f7aa3bb2a262fe90157257dc570ddcc06cd6ea Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 11:48:13 +0300 Subject: [PATCH 06/10] Added comparator for loaded mobile dashboard and sort by ID if created time equals --- .../dao/service/BaseDashboardServiceTest.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index 86bad0b1a0..640a9f615c 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -241,19 +241,26 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } } while (pageData.hasNext()); - Collections.sort(mobileDashboards, (o1, o2) -> { + Comparator dashboardInfoComparator = (o1, o2) -> { Integer order1 = o1.getMobileOrder(); Integer order2 = o2.getMobileOrder(); if (order1 == null && order2 == null) { - return (int)(o1.getCreatedTime() - o2.getCreatedTime()); + if (o1.getCreatedTime() == o2.getCreatedTime()) { + return o1.getUuidId().compareTo(o2.getUuidId()); + } else { + return (int) (o1.getCreatedTime() - o2.getCreatedTime()); + } } else if (order1 == null && order2 != null) { return 1; - } else if (order2 == null) { + } else if (order2 == null) { return -1; } else { return order1 - order2; } - }); + }; + + Collections.sort(mobileDashboards, dashboardInfoComparator); + Collections.sort(loadedMobileDashboards, dashboardInfoComparator); Assert.assertEquals(mobileDashboards, loadedMobileDashboards); From 2aea5fc7a920af4cc32d36f4f0754e17f1b140da Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 12:46:57 +0300 Subject: [PATCH 07/10] Fixed test --- .../dao/service/BaseDashboardServiceTest.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index bfd6a2a048..2201279750 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -233,7 +233,7 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } List loadedMobileDashboards = new ArrayList<>(); - PageLink pageLink = new PageLink(16, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC)); + PageLink pageLink = new PageLink(16, 0, null, new SortOrder("title", SortOrder.Direction.ASC)); PageData pageData = null; do { pageData = dashboardService.findMobileDashboardsByTenantId(tenantId, pageLink); @@ -243,15 +243,11 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } } while (pageData.hasNext()); - Comparator dashboardInfoComparator = (o1, o2) -> { + Collections.sort(mobileDashboards, (o1, o2) -> { Integer order1 = o1.getMobileOrder(); Integer order2 = o2.getMobileOrder(); if (order1 == null && order2 == null) { - if (o1.getCreatedTime() == o2.getCreatedTime()) { - return o1.getUuidId().compareTo(o2.getUuidId()); - } else { - return (int) (o1.getCreatedTime() - o2.getCreatedTime()); - } + return o1.getTitle().compareTo(o2.getTitle()); } else if (order1 == null && order2 != null) { return 1; } else if (order2 == null) { @@ -259,10 +255,7 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } else { return order1 - order2; } - }; - - Collections.sort(mobileDashboards, dashboardInfoComparator); - Collections.sort(loadedMobileDashboards, dashboardInfoComparator); + }); Assert.assertEquals(mobileDashboards, loadedMobileDashboards); From eb56b11b2f1816d3aa4d76d9050f3ecfb7a18fef Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 14:41:38 +0300 Subject: [PATCH 08/10] UnIgnore test --- .../thingsboard/server/dao/service/BaseDashboardServiceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index 2201279750..1d5019d65a 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -209,7 +209,6 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { } @Test - @Ignore public void testFindMobileDashboardsByTenantId() { Tenant tenant = new Tenant(); tenant.setTitle("Test tenant"); From 99864776ad77d53f1d985f3df2842878855857d9 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 23 Jul 2021 15:15:07 +0300 Subject: [PATCH 09/10] Removed unused import --- .../thingsboard/server/dao/service/BaseDashboardServiceTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index 1d5019d65a..0e57208c07 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -20,7 +20,6 @@ import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; From 2ce04c3728a68585fa6a0d9f4e729b47c4422cde Mon Sep 17 00:00:00 2001 From: Igor Kulikov Date: Fri, 30 Jul 2021 11:41:09 +0300 Subject: [PATCH 10/10] Update BaseDashboardServiceTest.java --- .../server/dao/service/BaseDashboardServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java index 0e57208c07..ce307b74b0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseDashboardServiceTest.java @@ -248,7 +248,7 @@ public abstract class BaseDashboardServiceTest extends AbstractServiceTest { return o1.getTitle().compareTo(o2.getTitle()); } else if (order1 == null && order2 != null) { return 1; - } else if (order2 == null) { + } else if (order2 == null) { return -1; } else { return order1 - order2;