Used Java serialization for NotificationRuleTrigger

This commit is contained in:
YevhenBondarenko 2024-01-09 12:31:23 +01:00
parent dd71198908
commit 720afebeeb
4 changed files with 12 additions and 11 deletions

View File

@ -28,6 +28,7 @@ import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmInfo;
import org.thingsboard.server.common.data.event.ErrorEvent; import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.Event; import org.thingsboard.server.common.data.event.Event;
@ -383,7 +384,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback); forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
} else if (toCoreNotification.hasNotificationRuleProcessorMsg()) { } else if (toCoreNotification.hasNotificationRuleProcessorMsg()) {
NotificationRuleTrigger notificationRuleTrigger = NotificationRuleTrigger notificationRuleTrigger =
JacksonUtil.fromBytes(toCoreNotification.getNotificationRuleProcessorMsg().getTrigger().toByteArray(), NotificationRuleTrigger.class); JavaSerDesUtil.decode(toCoreNotification.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
notificationRuleProcessor.process(notificationRuleTrigger); notificationRuleProcessor.process(notificationRuleTrigger);
callback.onSuccess(); callback.onSuccess();
} else if (toCoreNotification.hasResourceCacheInvalidateMsg()) { } else if (toCoreNotification.hasResourceCacheInvalidateMsg()) {

View File

@ -1506,7 +1506,6 @@ message NotificationSchedulerServiceMsg {
} }
message NotificationRuleProcessorMsg { message NotificationRuleProcessorMsg {
//Json
bytes trigger = 1; bytes trigger = 1;
} }

View File

@ -20,7 +20,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
@ -53,7 +53,7 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso
log.debug("Submitting notification rule trigger: {}", trigger); log.debug("Submitting notification rule trigger: {}", trigger);
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder() TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
.setTrigger(ByteString.copyFrom(JacksonUtil.writeValueAsBytes(trigger))); .setTrigger(ByteString.copyFrom(JavaSerDesUtil.encode(trigger)));
partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> { partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);

View File

@ -24,7 +24,6 @@ import org.junit.jupiter.api.Assertions;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityInfo; import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.JavaSerDesUtil;
import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -38,12 +37,15 @@ import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -300,19 +302,18 @@ public class TenantProfileServiceTest extends AbstractServiceTest {
} }
@Test @Test
public void testTenantProfileSerialization_fst() { public void testTenantProfileSerialization_proto() {
TenantProfile tenantProfile = new TenantProfile(); TenantProfile tenantProfile = new TenantProfile();
tenantProfile.setId(new TenantProfileId(UUID.randomUUID()));
tenantProfile.setName("testProfile");
TenantProfileData profileData = new TenantProfileData(); TenantProfileData profileData = new TenantProfileData();
tenantProfile.setProfileData(profileData); tenantProfile.setProfileData(profileData);
profileData.setConfiguration(new DefaultTenantProfileConfiguration()); profileData.setConfiguration(new DefaultTenantProfileConfiguration());
addMainQueueConfig(tenantProfile); addMainQueueConfig(tenantProfile);
byte[] serialized = assertDoesNotThrow(() -> JavaSerDesUtil.encode(tenantProfile)); byte[] serialized = assertDoesNotThrow(() -> ProtoUtils.toProto(tenantProfile).toByteArray());
assertDoesNotThrow(() -> {
JavaSerDesUtil.encode(profileData);
});
TenantProfile deserialized = assertDoesNotThrow(() -> JavaSerDesUtil.decode(serialized)); TenantProfile deserialized = assertDoesNotThrow(() -> ProtoUtils.fromProto(TransportProtos.TenantProfileProto.parseFrom(serialized)));
assertThat(deserialized).isEqualTo(tenantProfile); assertThat(deserialized).isEqualTo(tenantProfile);
assertThat(deserialized.getProfileData()).isNotNull(); assertThat(deserialized.getProfileData()).isNotNull();
assertThat(deserialized.getProfileData().getQueueConfiguration()).isNotEmpty(); assertThat(deserialized.getProfileData().getQueueConfiguration()).isNotEmpty();