From d1a9c4432acb5398497a01b148e01018933fa0b3 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Tue, 30 Jan 2024 11:56:10 +0200 Subject: [PATCH] Add rate limit of at most one event per second per originator in node --- .../server/common/data/msg/TbMsgType.java | 1 + .../rule/engine/action/TbDeviceStateNode.java | 86 +++++++- .../engine/action/TbDeviceStateNodeTest.java | 187 +++++++++++++++--- 3 files changed, 239 insertions(+), 35 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java index 763f245a09..c7eeef158d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/msg/TbMsgType.java @@ -76,6 +76,7 @@ public enum TbMsgType { DEDUPLICATION_TIMEOUT_SELF_MSG(null, true), DELAY_TIMEOUT_SELF_MSG(null, true), MSG_COUNT_SELF_MSG(null, true), + DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG(null, true), // Custom or N/A type: NA; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java index d8cc81679c..a74df13f20 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.action; +import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; import org.thingsboard.rule.engine.api.RuleNode; @@ -29,10 +30,15 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.TbCallback; +import java.time.Duration; import java.util.EnumSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @Slf4j @RuleNode( @@ -61,7 +67,11 @@ public class TbDeviceStateNode implements TbNode { private static final Set SUPPORTED_EVENTS = EnumSet.of( TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT ); + private static final Duration ONE_SECOND = Duration.ofSeconds(1L); + private static final Duration ENTRY_EXPIRATION_TIME = Duration.ofDays(1L); + private Stopwatch stopwatch; + private ConcurrentMap lastActivityEventTimestamps; private TbMsgType event; @Override @@ -74,31 +84,82 @@ public class TbDeviceStateNode implements TbNode { throw new TbNodeException("Unsupported event: " + event, true); } this.event = event; + lastActivityEventTimestamps = new ConcurrentHashMap<>(); + stopwatch = Stopwatch.createStarted(); + scheduleCleanupMsg(ctx); } @Override public void onMsg(TbContext ctx, TbMsg msg) { + if (msg.isTypeOf(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG)) { + removeStaleEntries(); + scheduleCleanupMsg(ctx); + ctx.ack(msg); + return; + } + if (!EntityType.DEVICE.equals(msg.getOriginator().getEntityType())) { ctx.tellSuccess(msg); return; } + DeviceId originator = new DeviceId(msg.getOriginator().getId()); + + lastActivityEventTimestamps.compute(originator, (__, lastEventTs) -> { + Duration now = stopwatch.elapsed(); + + if (lastEventTs == null) { + sendEvent(ctx, originator, msg); + return now; + } + + Duration elapsedSinceLastEventSent = now.minus(lastEventTs); + if (elapsedSinceLastEventSent.compareTo(ONE_SECOND) < 0) { + ctx.tellSuccess(msg); + return lastEventTs; + } + + sendEvent(ctx, originator, msg); + + return now; + }); + } + + private void scheduleCleanupMsg(TbContext ctx) { + TbMsg cleanupMsg = ctx.newMsg( + null, TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG, ctx.getSelfId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING + ); + ctx.tellSelf(cleanupMsg, Duration.ofHours(1L).toMillis()); + } + + private void removeStaleEntries() { + lastActivityEventTimestamps.entrySet().removeIf(entry -> { + Duration now = stopwatch.elapsed(); + Duration lastEventTs = entry.getValue(); + Duration elapsedSinceLastEventSent = now.minus(lastEventTs); + return elapsedSinceLastEventSent.compareTo(ENTRY_EXPIRATION_TIME) > 0; + }); + } + + private void sendEvent(TbContext ctx, DeviceId originator, TbMsg msg) { TenantId tenantId = ctx.getTenantId(); - DeviceId deviceId = (DeviceId) msg.getOriginator(); + long eventTs = msg.getMetaDataTs(); + RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager(); + TbCallback callback = getMsgEnqueuedCallback(ctx, msg); switch (event) { case CONNECT_EVENT: - deviceStateManager.onDeviceConnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg)); + deviceStateManager.onDeviceConnect(tenantId, originator, eventTs, callback); break; case ACTIVITY_EVENT: - deviceStateManager.onDeviceActivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg)); + deviceStateManager.onDeviceActivity(tenantId, originator, eventTs, callback); break; case DISCONNECT_EVENT: - deviceStateManager.onDeviceDisconnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg)); + deviceStateManager.onDeviceDisconnect(tenantId, originator, eventTs, callback); break; case INACTIVITY_EVENT: - deviceStateManager.onDeviceInactivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg)); + deviceStateManager.onDeviceInactivity(tenantId, originator, eventTs, callback); break; default: ctx.tellFailure(msg, new IllegalStateException("Configured event [" + event + "] is not supported!")); @@ -119,4 +180,19 @@ public class TbDeviceStateNode implements TbNode { }; } + @Override + public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { + lastActivityEventTimestamps.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey())); + } + + @Override + public void destroy() { + if (lastActivityEventTimestamps != null) { + lastActivityEventTimestamps.clear(); + lastActivityEventTimestamps = null; + } + stopwatch = null; + event = null; + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java index e967ec49b5..af72927e3c 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java @@ -15,6 +15,8 @@ */ package org.thingsboard.rule.engine.action; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -26,29 +28,35 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import java.time.Duration; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; @@ -62,28 +70,33 @@ public class TbDeviceStateNodeTest { @Captor private static ArgumentCaptor callbackCaptor; private TbDeviceStateNode node; + private RuleNodeId nodeId; private TbDeviceStateNodeConfiguration config; private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.randomUUID()); private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID()); private static final long METADATA_TS = System.currentTimeMillis(); + private TbMsg cleanupMsg; private TbMsg msg; + private long nowNanos; + private final Ticker controlledTicker = new Ticker() { + @Override + public long read() { + return nowNanos; + } + }; @BeforeEach public void setup() { - var device = new Device(); - device.setTenantId(TENANT_ID); - device.setId(DEVICE_ID); - device.setName("My humidity sensor"); - device.setType("Humidity sensor"); - device.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID())); var metaData = new TbMsgMetaData(); - metaData.putValue("deviceName", device.getName()); - metaData.putValue("deviceType", device.getType()); + metaData.putValue("deviceName", "My humidity sensor"); + metaData.putValue("deviceType", "Humidity sensor"); metaData.putValue("ts", String.valueOf(METADATA_TS)); var data = JacksonUtil.newObjectNode(); data.put("humidity", 58.3); - msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, device.getId(), metaData, JacksonUtil.toString(data)); + msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, JacksonUtil.toString(data)); + nodeId = new RuleNodeId(UUID.randomUUID()); + cleanupMsg = TbMsg.newMsg(null, TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG, nodeId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); } @BeforeEach @@ -99,17 +112,121 @@ public class TbDeviceStateNodeTest { @Test public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() { - // GIVEN - config.setEvent(null); - var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - - // WHEN-THEN - assertThatThrownBy(() -> node.init(ctxMock, nodeConfig)) + // GIVEN-WHEN-THEN + assertThatThrownBy(() -> initNode(null)) .isInstanceOf(TbNodeException.class) .hasMessage("Event cannot be null!") .matches(e -> ((TbNodeException) e).isUnrecoverable()); } + @Test + public void givenValidConfig_whenInit_thenSchedulesCleanupMsg() { + // GIVEN + given(ctxMock.getSelfId()).willReturn(nodeId); + given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg); + + // WHEN + try { + initNode(TbMsgType.ACTIVITY_EVENT); + } catch (Exception e) { + fail("Node failed to initialize."); + } + + // THEN + verifyCleanupMsgSent(); + } + + @Test + public void givenCleanupMsg_whenOnMsg_thenCleansStaleEntries() { + // GIVEN + given(ctxMock.getSelfId()).willReturn(nodeId); + given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg); + + ConcurrentMap lastActivityEventTimestamps = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps); + + Stopwatch stopwatch = Stopwatch.createStarted(controlledTicker); + ReflectionTestUtils.setField(node, "stopwatch", stopwatch); + + // WHEN + Duration expirationTime = Duration.ofDays(1L); + + DeviceId staleId = DEVICE_ID; + Duration staleTs = Duration.ofHours(4L); + lastActivityEventTimestamps.put(staleId, staleTs); + + DeviceId goodId = new DeviceId(UUID.randomUUID()); + Duration goodTs = staleTs.plus(expirationTime); + lastActivityEventTimestamps.put(goodId, goodTs); + + nowNanos = staleTs.toNanos() + expirationTime.toNanos() + 1; + node.onMsg(ctxMock, cleanupMsg); + + // THEN + assertThat(lastActivityEventTimestamps) + .containsKey(goodId) + .doesNotContainKey(staleId) + .size().isOne(); + + verifyCleanupMsgSent(); + then(ctxMock).should().ack(cleanupMsg); + then(ctxMock).shouldHaveNoMoreInteractions(); + } + + @Test + public void givenMsgArrivedTooFast_whenOnMsg_thenRateLimitsThisMsg() { + // GIVEN + ConcurrentMap lastActivityEventTimestamps = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps); + + Stopwatch stopwatch = Stopwatch.createStarted(controlledTicker); + ReflectionTestUtils.setField(node, "stopwatch", stopwatch); + + // WHEN + Duration firstEventTs = Duration.ofMillis(1000L); + lastActivityEventTimestamps.put(DEVICE_ID, firstEventTs); + + Duration tooFastEventTs = firstEventTs.plus(Duration.ofMillis(999L)); + nowNanos = tooFastEventTs.toNanos(); + node.onMsg(ctxMock, msg); + + // THEN + Duration actualEventTs = lastActivityEventTimestamps.get(DEVICE_ID); + assertThat(actualEventTs).isEqualTo(firstEventTs); + + then(ctxMock).should().tellSuccess(msg); + then(ctxMock).shouldHaveNoMoreInteractions(); + then(deviceStateManagerMock).shouldHaveNoInteractions(); + } + + @Test + public void givenHasNonLocalDevices_whenOnPartitionChange_thenRemovesEntriesForNonLocalDevices() { + // GIVEN + ConcurrentMap lastActivityEventTimestamps = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps); + + lastActivityEventTimestamps.put(DEVICE_ID, Duration.ofHours(24L)); + given(ctxMock.isLocalEntity(eq(DEVICE_ID))).willReturn(true); + + DeviceId nonLocalDeviceId1 = new DeviceId(UUID.randomUUID()); + lastActivityEventTimestamps.put(nonLocalDeviceId1, Duration.ofHours(30L)); + given(ctxMock.isLocalEntity(eq(nonLocalDeviceId1))).willReturn(false); + + DeviceId nonLocalDeviceId2 = new DeviceId(UUID.randomUUID()); + lastActivityEventTimestamps.put(nonLocalDeviceId2, Duration.ofHours(32L)); + given(ctxMock.isLocalEntity(eq(nonLocalDeviceId2))).willReturn(false); + + // WHEN + node.onPartitionChangeMsg(ctxMock, new PartitionChangeMsg(ServiceType.TB_RULE_ENGINE)); + + // THEN + assertThat(lastActivityEventTimestamps) + .containsKey(DEVICE_ID) + .doesNotContainKey(nonLocalDeviceId1) + .doesNotContainKey(nonLocalDeviceId2) + .size().isOne(); + } + @ParameterizedTest @EnumSource( value = TbMsgType.class, @@ -117,12 +234,8 @@ public class TbDeviceStateNodeTest { mode = EnumSource.Mode.EXCLUDE ) public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException(TbMsgType unsupportedEvent) { - // GIVEN - config.setEvent(unsupportedEvent); - var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); - - // WHEN-THEN - assertThatThrownBy(() -> node.init(ctxMock, nodeConfig)) + // GIVEN-WHEN-THEN + assertThatThrownBy(() -> initNode(unsupportedEvent)) .isInstanceOf(TbNodeException.class) .hasMessage("Unsupported event: " + unsupportedEvent) .matches(e -> ((TbNodeException) e).isUnrecoverable()); @@ -156,17 +269,19 @@ public class TbDeviceStateNodeTest { @ParameterizedTest @MethodSource - public void givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) { + public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) { // GIVEN - config.setEvent(supportedEventType); - var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + given(ctxMock.getSelfId()).willReturn(nodeId); + given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg); + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock); + try { - node.init(ctxMock, nodeConfig); + initNode(supportedEventType); } catch (TbNodeException e) { fail("Node failed to initialize!", e); } - given(ctxMock.getTenantId()).willReturn(TENANT_ID); - given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock); + verifyCleanupMsgSent(); // WHEN node.onMsg(ctxMock, msg); @@ -188,7 +303,7 @@ public class TbDeviceStateNodeTest { then(ctxMock).shouldHaveNoMoreInteractions(); } - private static Stream givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback() { + private static Stream givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() { return Stream.of( Arguments.of(TbMsgType.CONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), Arguments.of(TbMsgType.ACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())), @@ -197,4 +312,16 @@ public class TbDeviceStateNodeTest { ); } + private void initNode(TbMsgType event) throws TbNodeException { + config.setEvent(event); + var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + node.init(ctxMock, nodeConfig); + } + + private void verifyCleanupMsgSent() { + then(ctxMock).should().getSelfId(); + then(ctxMock).should().newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING)); + then(ctxMock).should().tellSelf(eq(cleanupMsg), eq(Duration.ofHours(1L).toMillis())); + } + }