Add rate limit of at most one event per second per originator in node

This commit is contained in:
Dmytro Skarzhynets 2024-01-30 11:56:10 +02:00 committed by Dmytro Skarzhynets
parent b19d7f1406
commit d1a9c4432a
3 changed files with 239 additions and 35 deletions

View File

@ -76,6 +76,7 @@ public enum TbMsgType {
DEDUPLICATION_TIMEOUT_SELF_MSG(null, true),
DELAY_TIMEOUT_SELF_MSG(null, true),
MSG_COUNT_SELF_MSG(null, true),
DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG(null, true),
// Custom or N/A type:
NA;

View File

@ -15,6 +15,7 @@
*/
package org.thingsboard.rule.engine.action;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
@ -29,10 +30,15 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.time.Duration;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Slf4j
@RuleNode(
@ -61,7 +67,11 @@ public class TbDeviceStateNode implements TbNode {
private static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
);
private static final Duration ONE_SECOND = Duration.ofSeconds(1L);
private static final Duration ENTRY_EXPIRATION_TIME = Duration.ofDays(1L);
private Stopwatch stopwatch;
private ConcurrentMap<DeviceId, Duration> lastActivityEventTimestamps;
private TbMsgType event;
@Override
@ -74,31 +84,82 @@ public class TbDeviceStateNode implements TbNode {
throw new TbNodeException("Unsupported event: " + event, true);
}
this.event = event;
lastActivityEventTimestamps = new ConcurrentHashMap<>();
stopwatch = Stopwatch.createStarted();
scheduleCleanupMsg(ctx);
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
if (msg.isTypeOf(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG)) {
removeStaleEntries();
scheduleCleanupMsg(ctx);
ctx.ack(msg);
return;
}
if (!EntityType.DEVICE.equals(msg.getOriginator().getEntityType())) {
ctx.tellSuccess(msg);
return;
}
DeviceId originator = new DeviceId(msg.getOriginator().getId());
lastActivityEventTimestamps.compute(originator, (__, lastEventTs) -> {
Duration now = stopwatch.elapsed();
if (lastEventTs == null) {
sendEvent(ctx, originator, msg);
return now;
}
Duration elapsedSinceLastEventSent = now.minus(lastEventTs);
if (elapsedSinceLastEventSent.compareTo(ONE_SECOND) < 0) {
ctx.tellSuccess(msg);
return lastEventTs;
}
sendEvent(ctx, originator, msg);
return now;
});
}
private void scheduleCleanupMsg(TbContext ctx) {
TbMsg cleanupMsg = ctx.newMsg(
null, TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG, ctx.getSelfId(), TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING
);
ctx.tellSelf(cleanupMsg, Duration.ofHours(1L).toMillis());
}
private void removeStaleEntries() {
lastActivityEventTimestamps.entrySet().removeIf(entry -> {
Duration now = stopwatch.elapsed();
Duration lastEventTs = entry.getValue();
Duration elapsedSinceLastEventSent = now.minus(lastEventTs);
return elapsedSinceLastEventSent.compareTo(ENTRY_EXPIRATION_TIME) > 0;
});
}
private void sendEvent(TbContext ctx, DeviceId originator, TbMsg msg) {
TenantId tenantId = ctx.getTenantId();
DeviceId deviceId = (DeviceId) msg.getOriginator();
long eventTs = msg.getMetaDataTs();
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
TbCallback callback = getMsgEnqueuedCallback(ctx, msg);
switch (event) {
case CONNECT_EVENT:
deviceStateManager.onDeviceConnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
deviceStateManager.onDeviceConnect(tenantId, originator, eventTs, callback);
break;
case ACTIVITY_EVENT:
deviceStateManager.onDeviceActivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
deviceStateManager.onDeviceActivity(tenantId, originator, eventTs, callback);
break;
case DISCONNECT_EVENT:
deviceStateManager.onDeviceDisconnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
deviceStateManager.onDeviceDisconnect(tenantId, originator, eventTs, callback);
break;
case INACTIVITY_EVENT:
deviceStateManager.onDeviceInactivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
deviceStateManager.onDeviceInactivity(tenantId, originator, eventTs, callback);
break;
default:
ctx.tellFailure(msg, new IllegalStateException("Configured event [" + event + "] is not supported!"));
@ -119,4 +180,19 @@ public class TbDeviceStateNode implements TbNode {
};
}
@Override
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
lastActivityEventTimestamps.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
}
@Override
public void destroy() {
if (lastActivityEventTimestamps != null) {
lastActivityEventTimestamps.clear();
lastActivityEventTimestamps = null;
}
stopwatch = null;
event = null;
}
}

View File

@ -15,6 +15,8 @@
*/
package org.thingsboard.rule.engine.action;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -26,29 +28,35 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
@ -62,28 +70,33 @@ public class TbDeviceStateNodeTest {
@Captor
private static ArgumentCaptor<TbCallback> callbackCaptor;
private TbDeviceStateNode node;
private RuleNodeId nodeId;
private TbDeviceStateNodeConfiguration config;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.randomUUID());
private static final DeviceId DEVICE_ID = new DeviceId(UUID.randomUUID());
private static final long METADATA_TS = System.currentTimeMillis();
private TbMsg cleanupMsg;
private TbMsg msg;
private long nowNanos;
private final Ticker controlledTicker = new Ticker() {
@Override
public long read() {
return nowNanos;
}
};
@BeforeEach
public void setup() {
var device = new Device();
device.setTenantId(TENANT_ID);
device.setId(DEVICE_ID);
device.setName("My humidity sensor");
device.setType("Humidity sensor");
device.setDeviceProfileId(new DeviceProfileId(UUID.randomUUID()));
var metaData = new TbMsgMetaData();
metaData.putValue("deviceName", device.getName());
metaData.putValue("deviceType", device.getType());
metaData.putValue("deviceName", "My humidity sensor");
metaData.putValue("deviceType", "Humidity sensor");
metaData.putValue("ts", String.valueOf(METADATA_TS));
var data = JacksonUtil.newObjectNode();
data.put("humidity", 58.3);
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, device.getId(), metaData, JacksonUtil.toString(data));
msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metaData, JacksonUtil.toString(data));
nodeId = new RuleNodeId(UUID.randomUUID());
cleanupMsg = TbMsg.newMsg(null, TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG, nodeId, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
}
@BeforeEach
@ -99,17 +112,121 @@ public class TbDeviceStateNodeTest {
@Test
public void givenNullEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() {
// GIVEN
config.setEvent(null);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
// WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, nodeConfig))
// GIVEN-WHEN-THEN
assertThatThrownBy(() -> initNode(null))
.isInstanceOf(TbNodeException.class)
.hasMessage("Event cannot be null!")
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenValidConfig_whenInit_thenSchedulesCleanupMsg() {
// GIVEN
given(ctxMock.getSelfId()).willReturn(nodeId);
given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg);
// WHEN
try {
initNode(TbMsgType.ACTIVITY_EVENT);
} catch (Exception e) {
fail("Node failed to initialize.");
}
// THEN
verifyCleanupMsgSent();
}
@Test
public void givenCleanupMsg_whenOnMsg_thenCleansStaleEntries() {
// GIVEN
given(ctxMock.getSelfId()).willReturn(nodeId);
given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg);
ConcurrentMap<DeviceId, Duration> lastActivityEventTimestamps = new ConcurrentHashMap<>();
ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps);
Stopwatch stopwatch = Stopwatch.createStarted(controlledTicker);
ReflectionTestUtils.setField(node, "stopwatch", stopwatch);
// WHEN
Duration expirationTime = Duration.ofDays(1L);
DeviceId staleId = DEVICE_ID;
Duration staleTs = Duration.ofHours(4L);
lastActivityEventTimestamps.put(staleId, staleTs);
DeviceId goodId = new DeviceId(UUID.randomUUID());
Duration goodTs = staleTs.plus(expirationTime);
lastActivityEventTimestamps.put(goodId, goodTs);
nowNanos = staleTs.toNanos() + expirationTime.toNanos() + 1;
node.onMsg(ctxMock, cleanupMsg);
// THEN
assertThat(lastActivityEventTimestamps)
.containsKey(goodId)
.doesNotContainKey(staleId)
.size().isOne();
verifyCleanupMsgSent();
then(ctxMock).should().ack(cleanupMsg);
then(ctxMock).shouldHaveNoMoreInteractions();
}
@Test
public void givenMsgArrivedTooFast_whenOnMsg_thenRateLimitsThisMsg() {
// GIVEN
ConcurrentMap<DeviceId, Duration> lastActivityEventTimestamps = new ConcurrentHashMap<>();
ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps);
Stopwatch stopwatch = Stopwatch.createStarted(controlledTicker);
ReflectionTestUtils.setField(node, "stopwatch", stopwatch);
// WHEN
Duration firstEventTs = Duration.ofMillis(1000L);
lastActivityEventTimestamps.put(DEVICE_ID, firstEventTs);
Duration tooFastEventTs = firstEventTs.plus(Duration.ofMillis(999L));
nowNanos = tooFastEventTs.toNanos();
node.onMsg(ctxMock, msg);
// THEN
Duration actualEventTs = lastActivityEventTimestamps.get(DEVICE_ID);
assertThat(actualEventTs).isEqualTo(firstEventTs);
then(ctxMock).should().tellSuccess(msg);
then(ctxMock).shouldHaveNoMoreInteractions();
then(deviceStateManagerMock).shouldHaveNoInteractions();
}
@Test
public void givenHasNonLocalDevices_whenOnPartitionChange_thenRemovesEntriesForNonLocalDevices() {
// GIVEN
ConcurrentMap<DeviceId, Duration> lastActivityEventTimestamps = new ConcurrentHashMap<>();
ReflectionTestUtils.setField(node, "lastActivityEventTimestamps", lastActivityEventTimestamps);
lastActivityEventTimestamps.put(DEVICE_ID, Duration.ofHours(24L));
given(ctxMock.isLocalEntity(eq(DEVICE_ID))).willReturn(true);
DeviceId nonLocalDeviceId1 = new DeviceId(UUID.randomUUID());
lastActivityEventTimestamps.put(nonLocalDeviceId1, Duration.ofHours(30L));
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId1))).willReturn(false);
DeviceId nonLocalDeviceId2 = new DeviceId(UUID.randomUUID());
lastActivityEventTimestamps.put(nonLocalDeviceId2, Duration.ofHours(32L));
given(ctxMock.isLocalEntity(eq(nonLocalDeviceId2))).willReturn(false);
// WHEN
node.onPartitionChangeMsg(ctxMock, new PartitionChangeMsg(ServiceType.TB_RULE_ENGINE));
// THEN
assertThat(lastActivityEventTimestamps)
.containsKey(DEVICE_ID)
.doesNotContainKey(nonLocalDeviceId1)
.doesNotContainKey(nonLocalDeviceId2)
.size().isOne();
}
@ParameterizedTest
@EnumSource(
value = TbMsgType.class,
@ -117,12 +234,8 @@ public class TbDeviceStateNodeTest {
mode = EnumSource.Mode.EXCLUDE
)
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException(TbMsgType unsupportedEvent) {
// GIVEN
config.setEvent(unsupportedEvent);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
// WHEN-THEN
assertThatThrownBy(() -> node.init(ctxMock, nodeConfig))
// GIVEN-WHEN-THEN
assertThatThrownBy(() -> initNode(unsupportedEvent))
.isInstanceOf(TbNodeException.class)
.hasMessage("Unsupported event: " + unsupportedEvent)
.matches(e -> ((TbNodeException) e).isUnrecoverable());
@ -156,17 +269,19 @@ public class TbDeviceStateNodeTest {
@ParameterizedTest
@MethodSource
public void givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) {
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) {
// GIVEN
config.setEvent(supportedEventType);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
given(ctxMock.getSelfId()).willReturn(nodeId);
given(ctxMock.newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING))).willReturn(cleanupMsg);
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock);
try {
node.init(ctxMock, nodeConfig);
initNode(supportedEventType);
} catch (TbNodeException e) {
fail("Node failed to initialize!", e);
}
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock);
verifyCleanupMsgSent();
// WHEN
node.onMsg(ctxMock, msg);
@ -188,7 +303,7 @@ public class TbDeviceStateNodeTest {
then(ctxMock).shouldHaveNoMoreInteractions();
}
private static Stream<Arguments> givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback() {
private static Stream<Arguments> givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() {
return Stream.of(
Arguments.of(TbMsgType.CONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
@ -197,4 +312,16 @@ public class TbDeviceStateNodeTest {
);
}
private void initNode(TbMsgType event) throws TbNodeException {
config.setEvent(event);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
node.init(ctxMock, nodeConfig);
}
private void verifyCleanupMsgSent() {
then(ctxMock).should().getSelfId();
then(ctxMock).should().newMsg(isNull(), eq(TbMsgType.DEVICE_STATE_STALE_ENTRIES_CLEANUP_MSG), eq(nodeId), eq(TbMsgMetaData.EMPTY), eq(TbMsg.EMPTY_STRING));
then(ctxMock).should().tellSelf(eq(cleanupMsg), eq(Duration.ofHours(1L).toMillis()));
}
}