Merge pull request #9430 from smatvienko-tb/feature/componentLifeCycleProto
ComponentLifecycleMsgProto implemented as replacement of raw byte encoding
This commit is contained in:
commit
f47dff7fa9
@ -395,7 +395,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
|
||||
private void broadcast(ComponentLifecycleMsg msg) {
|
||||
byte[] msgBytes = encodingService.encode(msg);
|
||||
TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = ProtoUtils.toProto(msg);
|
||||
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
|
||||
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
|
||||
EntityType entityType = msg.getEntityId().getEntityType();
|
||||
@ -413,7 +413,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
|
||||
for (String serviceId : tbCoreServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycle(componentLifecycleMsgProto).build();
|
||||
toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null);
|
||||
toCoreNfs.incrementAndGet();
|
||||
}
|
||||
@ -422,7 +422,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
for (String serviceId : tbRuleEngineServices) {
|
||||
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
|
||||
ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build();
|
||||
ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycle(componentLifecycleMsgProto).build();
|
||||
toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null);
|
||||
toRuleEngineNfs.incrementAndGet();
|
||||
}
|
||||
|
||||
@ -346,7 +346,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
} else if (toCoreNotification.hasFromDeviceRpcResponse()) {
|
||||
log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
|
||||
forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
|
||||
} else if (toCoreNotification.hasComponentLifecycle()) {
|
||||
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
|
||||
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
|
||||
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
|
||||
callback.onSuccess();
|
||||
} else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
|
||||
|
||||
@ -423,7 +423,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
@Override
|
||||
protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbCallback callback) throws Exception {
|
||||
ToRuleEngineNotificationMsg nfMsg = msg.getValue();
|
||||
if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) {
|
||||
if (nfMsg.hasComponentLifecycle()) {
|
||||
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(nfMsg.getComponentLifecycle()));
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) {
|
||||
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
|
||||
handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg());
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.hasFromDeviceRpcResponse()) {
|
||||
|
||||
@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.queue;
|
||||
|
||||
import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
public class ProtoUtils {
|
||||
|
||||
public static TransportProtos.ComponentLifecycleMsgProto toProto(ComponentLifecycleMsg msg) {
|
||||
return TransportProtos.ComponentLifecycleMsgProto.newBuilder()
|
||||
.setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits())
|
||||
.setEntityType(TransportProtos.EntityType.forNumber(msg.getEntityId().getEntityType().ordinal()))
|
||||
.setEntityIdMSB(msg.getEntityId().getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(msg.getEntityId().getId().getLeastSignificantBits())
|
||||
.setEvent(TransportProtos.ComponentLifecycleEvent.forNumber(msg.getEvent().ordinal()))
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ComponentLifecycleMsg fromProto(TransportProtos.ComponentLifecycleMsgProto proto) {
|
||||
return new ComponentLifecycleMsg(
|
||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||
EntityIdFactory.getByTypeAndUuid(proto.getEntityTypeValue(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())),
|
||||
ComponentLifecycleEvent.values()[proto.getEventValue()]
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
@ -168,6 +168,8 @@ public class TbCoreConsumerStats {
|
||||
toCoreNfSubscriptionServiceCounter.increment();
|
||||
} else if (msg.hasFromDeviceRpcResponse()) {
|
||||
toCoreNfDeviceRpcResponseCounter.increment();
|
||||
} else if (msg.hasComponentLifecycle()) {
|
||||
toCoreNfComponentLifecycleCounter.increment();
|
||||
} else if (!msg.getComponentLifecycleMsg().isEmpty()) {
|
||||
toCoreNfComponentLifecycleCounter.increment();
|
||||
} else if (!msg.getEdgeEventUpdateMsg().isEmpty()) {
|
||||
|
||||
@ -166,10 +166,13 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
});
|
||||
}
|
||||
|
||||
// To be removed in 3.6.1 in favour of handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg)
|
||||
protected void handleComponentLifecycleMsg(UUID id, ByteString nfMsg) {
|
||||
Optional<TbActorMsg> actorMsgOpt = encodingService.decode(nfMsg.toByteArray());
|
||||
if (actorMsgOpt.isPresent()) {
|
||||
TbActorMsg actorMsg = actorMsgOpt.get();
|
||||
actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg));
|
||||
}
|
||||
|
||||
protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) {
|
||||
if (actorMsg instanceof ComponentLifecycleMsg) {
|
||||
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
|
||||
log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(),
|
||||
@ -212,10 +215,9 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
}
|
||||
eventPublisher.publishEvent(componentLifecycleMsg);
|
||||
}
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
|
||||
log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, actorMsg);
|
||||
actorContext.tellWithHighPriority(actorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception;
|
||||
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Copyright © 2016-2023 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.queue;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ProtoUtilsTest {
|
||||
|
||||
TenantId tenantId = TenantId.fromUUID(UUID.fromString("35e10f77-16e7-424d-ae46-ee780f87ac4f"));
|
||||
EntityId entityId = new RuleChainId(UUID.fromString("c640b635-4f0f-41e6-b10b-25a86003094e"));
|
||||
@Test
|
||||
void toProtoComponentLifecycleMsg() {
|
||||
ComponentLifecycleMsg msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.UPDATED);
|
||||
|
||||
TransportProtos.ComponentLifecycleMsgProto proto = ProtoUtils.toProto(msg);
|
||||
|
||||
assertThat(proto).as("to proto").isEqualTo(TransportProtos.ComponentLifecycleMsgProto.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setEntityType(TransportProtos.EntityType.forNumber(entityId.getEntityType().ordinal()))
|
||||
.setEntityIdMSB(entityId.getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits())
|
||||
.setEvent(TransportProtos.ComponentLifecycleEvent.forNumber(ComponentLifecycleEvent.UPDATED.ordinal()))
|
||||
.build()
|
||||
);
|
||||
|
||||
assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fromProtoComponentLifecycleMsg() {
|
||||
TransportProtos.ComponentLifecycleMsgProto proto = TransportProtos.ComponentLifecycleMsgProto.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setEntityType(TransportProtos.EntityType.forNumber(entityId.getEntityType().ordinal()))
|
||||
.setEntityIdMSB(entityId.getId().getMostSignificantBits())
|
||||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits())
|
||||
.setEvent(TransportProtos.ComponentLifecycleEvent.forNumber(ComponentLifecycleEvent.STARTED.ordinal()))
|
||||
.build();
|
||||
|
||||
ComponentLifecycleMsg msg = ProtoUtils.fromProto(proto);
|
||||
|
||||
assertThat(msg).as("from proto").isEqualTo(
|
||||
new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.STARTED));
|
||||
|
||||
assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto);
|
||||
}
|
||||
|
||||
}
|
||||
@ -20,6 +20,38 @@ package transport;
|
||||
option java_package = "org.thingsboard.server.gen.transport";
|
||||
option java_outer_classname = "TransportProtos";
|
||||
|
||||
/**
|
||||
* Common data structures
|
||||
*/
|
||||
enum EntityType {
|
||||
TENANT = 0;
|
||||
CUSTOMER = 1;
|
||||
USER = 2;
|
||||
DASHBOARD = 3;
|
||||
ASSET = 4;
|
||||
DEVICE = 5;
|
||||
ALARM = 6;
|
||||
RULE_CHAIN = 7;
|
||||
RULE_NODE = 8;
|
||||
ENTITY_VIEW = 9;
|
||||
WIDGETS_BUNDLE = 10;
|
||||
WIDGET_TYPE = 11;
|
||||
TENANT_PROFILE = 12;
|
||||
DEVICE_PROFILE = 13;
|
||||
ASSET_PROFILE = 14;
|
||||
API_USAGE_STATE = 15;
|
||||
TB_RESOURCE = 16;
|
||||
OTA_PACKAGE = 17;
|
||||
EDGE = 18;
|
||||
RPC = 19;
|
||||
QUEUE = 20;
|
||||
NOTIFICATION_TARGET = 21;
|
||||
NOTIFICATION_TEMPLATE = 22;
|
||||
NOTIFICATION_REQUEST = 23;
|
||||
NOTIFICATION = 24;
|
||||
NOTIFICATION_RULE = 25;
|
||||
}
|
||||
|
||||
/**
|
||||
* Service Discovery Data Structures;
|
||||
*/
|
||||
@ -731,6 +763,25 @@ message FromDeviceRPCResponseProto {
|
||||
int32 error = 4;
|
||||
}
|
||||
|
||||
enum ComponentLifecycleEvent {
|
||||
CREATED = 0;
|
||||
STARTED = 1;
|
||||
ACTIVATED = 2;
|
||||
SUSPENDED = 3;
|
||||
UPDATED = 4;
|
||||
STOPPED = 5;
|
||||
DELETED = 6;
|
||||
}
|
||||
|
||||
message ComponentLifecycleMsgProto {
|
||||
int64 tenantIdMSB = 1;
|
||||
int64 tenantIdLSB = 2;
|
||||
EntityType entityType = 3;
|
||||
int64 entityIdMSB = 4;
|
||||
int64 entityIdLSB = 5;
|
||||
ComponentLifecycleEvent event = 6;
|
||||
}
|
||||
|
||||
message EdgeNotificationMsgProto {
|
||||
int64 tenantIdMSB = 1;
|
||||
int64 tenantIdLSB = 2;
|
||||
@ -980,10 +1031,11 @@ message ToCoreMsg {
|
||||
}
|
||||
|
||||
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
|
||||
/* Please, adjust the TbCoreConsumerStats when modifying the ToCoreNotificationMsg */
|
||||
message ToCoreNotificationMsg {
|
||||
LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1;
|
||||
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
|
||||
bytes componentLifecycleMsg = 3;
|
||||
bytes componentLifecycleMsg = 3; //will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto
|
||||
bytes edgeEventUpdateMsg = 4;
|
||||
QueueUpdateMsg queueUpdateMsg = 5;
|
||||
QueueDeleteMsg queueDeleteMsg = 6;
|
||||
@ -992,6 +1044,7 @@ message ToCoreNotificationMsg {
|
||||
bytes fromEdgeSyncResponseMsg = 9;
|
||||
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10;
|
||||
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11;
|
||||
ComponentLifecycleMsgProto componentLifecycle = 12;
|
||||
}
|
||||
|
||||
/* Messages that are handled by ThingsBoard RuleEngine Service */
|
||||
@ -1004,10 +1057,11 @@ message ToRuleEngineMsg {
|
||||
}
|
||||
|
||||
message ToRuleEngineNotificationMsg {
|
||||
bytes componentLifecycleMsg = 1;
|
||||
bytes componentLifecycleMsg = 1; // will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto
|
||||
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
|
||||
QueueUpdateMsg queueUpdateMsg = 3;
|
||||
QueueDeleteMsg queueDeleteMsg = 4;
|
||||
ComponentLifecycleMsgProto componentLifecycle = 5;
|
||||
}
|
||||
|
||||
/* Messages that are handled by ThingsBoard Transport Service */
|
||||
|
||||
@ -26,6 +26,7 @@ import java.util.stream.Collectors;
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
public enum EntityType {
|
||||
// In sync with EntityType proto
|
||||
TENANT,
|
||||
CUSTOMER,
|
||||
USER,
|
||||
|
||||
@ -29,6 +29,10 @@ public class EntityIdFactory {
|
||||
return getByTypeAndUuid(EntityType.values()[type], UUID.fromString(uuid));
|
||||
}
|
||||
|
||||
public static EntityId getByTypeAndUuid(int type, UUID uuid) {
|
||||
return getByTypeAndUuid(EntityType.values()[type], uuid);
|
||||
}
|
||||
|
||||
public static EntityId getByTypeAndUuid(String type, String uuid) {
|
||||
return getByTypeAndUuid(EntityType.valueOf(type), UUID.fromString(uuid));
|
||||
}
|
||||
|
||||
@ -21,5 +21,6 @@ import java.io.Serializable;
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
public enum ComponentLifecycleEvent implements Serializable {
|
||||
// In sync with ComponentLifecycleEvent proto
|
||||
CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED
|
||||
}
|
||||
@ -15,8 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.plugin;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
@ -31,21 +30,12 @@ import java.util.Optional;
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
@ToString
|
||||
@Data
|
||||
public class ComponentLifecycleMsg implements TenantAwareMsg, ToAllNodesMsg {
|
||||
@Getter
|
||||
private final TenantId tenantId;
|
||||
@Getter
|
||||
private final EntityId entityId;
|
||||
@Getter
|
||||
private final ComponentLifecycleEvent event;
|
||||
|
||||
public ComponentLifecycleMsg(TenantId tenantId, EntityId entityId, ComponentLifecycleEvent event) {
|
||||
this.tenantId = tenantId;
|
||||
this.entityId = entityId;
|
||||
this.event = event;
|
||||
}
|
||||
|
||||
public Optional<RuleChainId> getRuleChainId() {
|
||||
return entityId.getEntityType() == EntityType.RULE_CHAIN ? Optional.of((RuleChainId) entityId) : Optional.empty();
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.plugin;
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
@ -23,8 +24,11 @@ import org.thingsboard.server.common.msg.MsgType;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
* This class used only to tell local rule-node actor like 'existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMs( ...'
|
||||
* Never serialized to/from proto, otherwise you need to change proto mappers in ProtoUtils class
|
||||
*/
|
||||
@ToString
|
||||
@ToString(callSuper = true)
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class RuleNodeUpdatedMsg extends ComponentLifecycleMsg {
|
||||
|
||||
public RuleNodeUpdatedMsg(TenantId tenantId, EntityId entityId) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user