Add optimization using separate service which routes activity actions to device state service directly, when running in monolith. Refactor rule node tests

This commit is contained in:
Dmytro Skarzhynets 2024-01-26 16:15:11 +02:00 committed by Dmytro Skarzhynets
parent 3c460d79c3
commit eadcf916f5
11 changed files with 592 additions and 145 deletions

View File

@ -31,6 +31,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.slack.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
@ -203,6 +204,10 @@ public class ActorSystemContext {
@Getter
private DeviceCredentialsService deviceCredentialsService;
@Autowired
@Getter
private RuleEngineDeviceStateManager deviceStateManager;
@Autowired
@Getter
private TbTenantProfileCache tenantProfileCache;

View File

@ -28,6 +28,7 @@ import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
@ -683,6 +684,11 @@ class DefaultTbContext implements TbContext {
return mainCtx.getDeviceCredentialsService();
}
@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}
@Override
public TbClusterService getClusterService() {
return mainCtx.getClusterService();

View File

@ -0,0 +1,110 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
@Slf4j
@Service
@TbRuleEngineComponent
@RequiredArgsConstructor
public class ClusteredRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager {
private final TbClusterService clusterService;
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
var tenantUuid = tenantId.getId();
var deviceUuid = deviceId.getId();
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
log.trace("[{}][{}] Sending device connect message to core. Connect time: [{}].", tenantUuid, deviceUuid, connectTime);
clusterService.pushMsgToCore(tenantId, deviceId, toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
var tenantUuid = tenantId.getId();
var deviceUuid = deviceId.getId();
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
log.trace("[{}][{}] Sending device activity message to core. Activity time: [{}].", tenantUuid, deviceUuid, activityTime);
clusterService.pushMsgToCore(tenantId, deviceId, toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
var tenantUuid = tenantId.getId();
var deviceUuid = deviceId.getId();
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
log.trace("[{}][{}] Sending device disconnect message to core. Disconnect time: [{}].", tenantUuid, deviceUuid, disconnectTime);
clusterService.pushMsgToCore(tenantId, deviceId, toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
var tenantUuid = tenantId.getId();
var deviceUuid = deviceId.getId();
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
log.trace("[{}][{}] Sending device inactivity message to core. Inactivity time: [{}].", tenantUuid, deviceUuid, inactivityTime);
clusterService.pushMsgToCore(tenantId, deviceId, toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}

View File

@ -0,0 +1,85 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.util.TbCoreComponent;
@Slf4j
@Service
@Primary
@TbCoreComponent
@RequiredArgsConstructor
public class LocalRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager {
private final DeviceStateService deviceStateService;
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
try {
deviceStateService.onDeviceConnect(tenantId, deviceId, connectTime);
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connect event. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
try {
deviceStateService.onDeviceActivity(tenantId, deviceId, activityTime);
} catch (Exception e) {
log.error("[{}][{}] Failed to process device activity event. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
try {
deviceStateService.onDeviceDisconnect(tenantId, deviceId, disconnectTime);
} catch (Exception e) {
log.error("[{}][{}] Failed to process device disconnect event. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
try {
deviceStateService.onDeviceInactivity(tenantId, deviceId, inactivityTime);
} catch (Exception e) {
log.error("[{}][{}] Failed to process device inactivity event. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
}
}

View File

@ -0,0 +1,150 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.then;
@ExtendWith(MockitoExtension.class)
public class ClusteredRuleEngineDeviceStateManagerTest {
@Mock
private static TbClusterService tbClusterServiceMock;
@Mock
private static TbCallback tbCallbackMock;
@Mock
private static TbQueueMsgMetadata metadataMock;
@Captor
private static ArgumentCaptor<TbQueueCallback> queueCallbackCaptor;
private static ClusteredRuleEngineDeviceStateManager deviceStateManager;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002"));
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002");
private static final long EVENT_TS = System.currentTimeMillis();
@BeforeEach
public void setup() {
deviceStateManager = new ClusteredRuleEngineDeviceStateManager(tbClusterServiceMock);
}
@ParameterizedTest
@MethodSource
public void givenProcessingSuccess_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnSuccessCallback(Runnable onDeviceAction, Runnable actionVerification) {
// WHEN
onDeviceAction.run();
// THEN
actionVerification.run();
TbQueueCallback callback = queueCallbackCaptor.getValue();
callback.onSuccess(metadataMock);
then(tbCallbackMock).should().onSuccess();
var runtimeException = new RuntimeException("Something bad happened!");
callback.onFailure(runtimeException);
then(tbCallbackMock).should().onFailure(runtimeException);
}
private static Stream<Arguments> givenProcessingSuccess_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnSuccessCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastConnectTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
then(tbClusterServiceMock).should().pushMsgToCore(eq(TENANT_ID), eq(DEVICE_ID), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastActivityTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
then(tbClusterServiceMock).should().pushMsgToCore(eq(TENANT_ID), eq(DEVICE_ID), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastDisconnectTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
then(tbClusterServiceMock).should().pushMsgToCore(eq(TENANT_ID), eq(DEVICE_ID), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastInactivityTime(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
then(tbClusterServiceMock).should().pushMsgToCore(eq(TENANT_ID), eq(DEVICE_ID), eq(toCoreMsg), queueCallbackCaptor.capture());
}
)
);
}
}

View File

@ -0,0 +1,131 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.UUID;
import java.util.stream.Stream;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class LocalRuleEngineDeviceStateManagerTest {
@Mock
private static DeviceStateService deviceStateServiceMock;
@Mock
private static TbCallback tbCallbackMock;
private static LocalRuleEngineDeviceStateManager deviceStateManager;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002"));
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002");
private static final long EVENT_TS = System.currentTimeMillis();
private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException("Something bad happened!");
@BeforeEach
public void setup() {
deviceStateManager = new LocalRuleEngineDeviceStateManager(deviceStateServiceMock);
}
@ParameterizedTest
@MethodSource
public void givenProcessingSuccess_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnSuccessCallback(Runnable onDeviceAction, Runnable actionVerification) {
// WHEN
onDeviceAction.run();
// THEN
actionVerification.run();
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
private static Stream<Arguments> givenProcessingSuccess_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnSuccessCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ParameterizedTest
@MethodSource
public void givenProcessingFailure_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnFailureCallback(
Runnable exceptionThrowSetup, Runnable onDeviceAction, Runnable actionVerification
) {
// GIVEN
exceptionThrowSetup.run();
// WHEN
onDeviceAction.run();
// THEN
actionVerification.run();
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(RUNTIME_EXCEPTION);
}
private static Stream<Arguments> givenProcessingFailure_whenOnDeviceAction_thenCallsDeviceStateServiceAndOnFailureCallback() {
return Stream.of(
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Runnable) () -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(Runnable) () -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Runnable) () -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
}

View File

@ -335,7 +335,7 @@ public final class TbMsg implements Serializable {
this.originator = originator;
if (customerId == null || customerId.isNullUid()) {
if (originator != null && originator.getEntityType() == EntityType.CUSTOMER) {
this.customerId = (CustomerId) originator;
this.customerId = new CustomerId(originator.getId());
} else {
this.customerId = null;
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
public interface RuleEngineDeviceStateManager {
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback);
void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback);
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback);
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback);
}

View File

@ -280,6 +280,8 @@ public interface TbContext {
DeviceCredentialsService getDeviceCredentialsService();
RuleEngineDeviceStateManager getDeviceStateManager();
TbClusterService getClusterService();
DashboardService getDashboardService();

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.action;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -23,12 +24,12 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.EnumSet;
import java.util.Set;
@ -61,7 +62,7 @@ import java.util.Set;
)
public class TbDeviceStateNode implements TbNode {
static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of(
private static final Set<TbMsgType> SUPPORTED_EVENTS = EnumSet.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT, TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
);
@ -85,98 +86,41 @@ public class TbDeviceStateNode implements TbNode {
ctx.tellSuccess(msg);
return;
}
TenantId tenantId = ctx.getTenantId();
DeviceId deviceId = (DeviceId) msg.getOriginator();
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
switch (event) {
case CONNECT_EVENT:
sendDeviceConnectMsg(ctx, msg);
deviceStateManager.onDeviceConnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
break;
case ACTIVITY_EVENT:
sendDeviceActivityMsg(ctx, msg);
deviceStateManager.onDeviceActivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
break;
case DISCONNECT_EVENT:
sendDeviceDisconnectMsg(ctx, msg);
deviceStateManager.onDeviceDisconnect(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
break;
case INACTIVITY_EVENT:
sendDeviceInactivityMsg(ctx, msg);
deviceStateManager.onDeviceInactivity(tenantId, deviceId, msg.getMetaDataTs(), getMsgEnqueuedCallback(ctx, msg));
break;
default:
ctx.tellFailure(msg, new IllegalStateException("Configured event [" + event + "] is not supported!"));
}
}
private void sendDeviceConnectMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastConnectTime(msg.getMetaDataTs())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgEnqueuedCallback(ctx, msg)
);
}
private TbCallback getMsgEnqueuedCallback(TbContext ctx, TbMsg msg) {
return new TbCallback() {
@Override
public void onSuccess() {
ctx.tellSuccess(msg);
}
private void sendDeviceActivityMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastActivityTime(msg.getMetaDataTs())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgEnqueuedCallback(ctx, msg)
);
}
private void sendDeviceDisconnectMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastDisconnectTime(msg.getMetaDataTs())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgEnqueuedCallback(ctx, msg)
);
}
private void sendDeviceInactivityMsg(TbContext ctx, TbMsg msg) {
var tenantUuid = ctx.getTenantId().getId();
var deviceUuid = msg.getOriginator().getId();
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantUuid.getMostSignificantBits())
.setTenantIdLSB(tenantUuid.getLeastSignificantBits())
.setDeviceIdMSB(deviceUuid.getMostSignificantBits())
.setDeviceIdLSB(deviceUuid.getLeastSignificantBits())
.setLastInactivityTime(msg.getMetaDataTs())
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
ctx.getClusterService().pushMsgToCore(
ctx.getTenantId(), msg.getOriginator(), toCoreMsg, getMsgEnqueuedCallback(ctx, msg)
);
}
private TbQueueCallback getMsgEnqueuedCallback(TbContext ctx, TbMsg msg) {
return new SimpleTbQueueCallback(metadata -> ctx.tellSuccess(msg), t -> ctx.tellFailure(msg, t));
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
};
}
}

View File

@ -20,28 +20,28 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.common.msg.queue.TbCallback;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
@ -51,7 +51,6 @@ import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.times;
@ExtendWith(MockitoExtension.class)
public class TbDeviceStateNodeTest {
@ -59,7 +58,9 @@ public class TbDeviceStateNodeTest {
@Mock
private TbContext ctxMock;
@Mock
private TbClusterService tbClusterServiceMock;
private static RuleEngineDeviceStateManager deviceStateManagerMock;
@Captor
private static ArgumentCaptor<TbCallback> callbackCaptor;
private TbDeviceStateNode node;
private TbDeviceStateNodeConfiguration config;
@ -94,10 +95,6 @@ public class TbDeviceStateNodeTest {
@Test
public void givenDefaultConfiguration_whenInvoked_thenCorrectValuesAreSet() {
assertThat(config.getEvent()).isEqualTo(TbMsgType.ACTIVITY_EVENT);
assertThat(TbDeviceStateNode.SUPPORTED_EVENTS).isEqualTo(Set.of(
TbMsgType.CONNECT_EVENT, TbMsgType.ACTIVITY_EVENT,
TbMsgType.DISCONNECT_EVENT, TbMsgType.INACTIVITY_EVENT
));
}
@Test
@ -113,10 +110,14 @@ public class TbDeviceStateNodeTest {
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException() {
@ParameterizedTest
@EnumSource(
value = TbMsgType.class,
names = {"CONNECT_EVENT", "ACTIVITY_EVENT", "DISCONNECT_EVENT", "INACTIVITY_EVENT"},
mode = EnumSource.Mode.EXCLUDE
)
public void givenUnsupportedEventInConfig_whenInit_thenThrowsUnrecoverableTbNodeException(TbMsgType unsupportedEvent) {
// GIVEN
var unsupportedEvent = TbMsgType.TO_SERVER_RPC_REQUEST;
config.setEvent(unsupportedEvent);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
@ -127,11 +128,23 @@ public class TbDeviceStateNodeTest {
.matches(e -> ((TbNodeException) e).isUnrecoverable());
}
@Test
public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered() {
@ParameterizedTest
@EnumSource(value = EntityType.class, names = "DEVICE", mode = EnumSource.Mode.EXCLUDE)
public void givenNonDeviceOriginator_whenOnMsg_thenTellsSuccessAndNoActivityActionsTriggered(EntityType unsupportedType) {
// GIVEN
var asset = new AssetId(UUID.randomUUID());
var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, asset, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
var nonDeviceOriginator = new EntityId() {
@Override
public UUID getId() {
return UUID.randomUUID();
}
@Override
public EntityType getEntityType() {
return unsupportedType;
}
};
var msg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, nonDeviceOriginator, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
// WHEN
node.onMsg(ctxMock, msg);
@ -142,10 +155,10 @@ public class TbDeviceStateNodeTest {
}
@ParameterizedTest
@MethodSource("provideSupportedEventsAndExpectedMessages")
public void givenSupportedEvent_whenOnMsg_thenCorrectMsgIsSent(TbMsgType event, TransportProtos.ToCoreMsg expectedToCoreMsg) {
@MethodSource
public void givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback(TbMsgType supportedEventType, Runnable actionVerification) {
// GIVEN
config.setEvent(event);
config.setEvent(supportedEventType);
var nodeConfig = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
try {
node.init(ctxMock, nodeConfig);
@ -153,65 +166,34 @@ public class TbDeviceStateNodeTest {
fail("Node failed to initialize!", e);
}
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getClusterService()).willReturn(tbClusterServiceMock);
given(ctxMock.getDeviceStateManager()).willReturn(deviceStateManagerMock);
// WHEN
node.onMsg(ctxMock, msg);
// THEN
var protoCaptor = ArgumentCaptor.forClass(TransportProtos.ToCoreMsg.class);
var callbackCaptor = ArgumentCaptor.forClass(TbQueueCallback.class);
then(tbClusterServiceMock).should().pushMsgToCore(eq(TENANT_ID), eq(DEVICE_ID), protoCaptor.capture(), callbackCaptor.capture());
actionVerification.run();
TbQueueCallback actualCallback = callbackCaptor.getValue();
TbCallback actualCallback = callbackCaptor.getValue();
actualCallback.onSuccess(null);
actualCallback.onSuccess();
then(ctxMock).should().tellSuccess(msg);
var throwable = new Throwable();
actualCallback.onFailure(throwable);
then(ctxMock).should().tellFailure(msg, throwable);
assertThat(expectedToCoreMsg).isEqualTo(protoCaptor.getValue());
then(tbClusterServiceMock).shouldHaveNoMoreInteractions();
then(deviceStateManagerMock).shouldHaveNoMoreInteractions();
then(ctxMock).shouldHaveNoMoreInteractions();
}
private static Stream<Arguments> provideSupportedEventsAndExpectedMessages() {
private static Stream<Arguments> givenInactivityEventAndDeviceOriginator_whenOnMsg_thenOnDeviceInactivityIsCalledWithCorrectCallback() {
return Stream.of(
Arguments.of(TbMsgType.CONNECT_EVENT, TransportProtos.ToCoreMsg.newBuilder().setDeviceConnectMsg(
TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastConnectTime(METADATA_TS)
.build()).build()),
Arguments.of(TbMsgType.ACTIVITY_EVENT, TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(
TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastActivityTime(METADATA_TS)
.build()).build()),
Arguments.of(TbMsgType.DISCONNECT_EVENT, TransportProtos.ToCoreMsg.newBuilder().setDeviceDisconnectMsg(
TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastDisconnectTime(METADATA_TS)
.build()).build()),
Arguments.of(TbMsgType.INACTIVITY_EVENT, TransportProtos.ToCoreMsg.newBuilder().setDeviceInactivityMsg(
TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setLastInactivityTime(METADATA_TS)
.build()).build())
Arguments.of(TbMsgType.CONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (Runnable) () -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
);
}