Fix message order for Gateway and LwM2M transports in the core consumer

This commit is contained in:
Andrii Shvaika 2021-05-26 16:46:36 +03:00 committed by Andrew Shvayka
parent e9f5fd2706
commit 25143378c2
8 changed files with 47 additions and 21 deletions

View File

@ -63,6 +63,7 @@ import org.thingsboard.server.service.edge.EdgeNotificationService;
import org.thingsboard.server.service.firmware.FirmwareStateService; import org.thingsboard.server.service.firmware.FirmwareStateService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
@ -74,6 +75,7 @@ import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWra
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -198,14 +200,17 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
if (msgs.isEmpty()) { if (msgs.isEmpty()) {
continue; continue;
} }
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = msgs.stream().collect( List<IdMsgPair<ToCoreMsg>> orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> pendingMap = orderedMsgList.stream().collect(
Collectors.toConcurrentMap(IdMsgPair::getUuid, IdMsgPair::getMsg));
CountDownLatch processingTimeoutLatch = new CountDownLatch(1); CountDownLatch processingTimeoutLatch = new CountDownLatch(1);
TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>( TbPackProcessingContext<TbProtoQueueMsg<ToCoreMsg>> ctx = new TbPackProcessingContext<>(
processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>()); processingTimeoutLatch, pendingMap, new ConcurrentHashMap<>());
PendingMsgHolder pendingMsgHolder = new PendingMsgHolder(); PendingMsgHolder pendingMsgHolder = new PendingMsgHolder();
Future<?> packSubmitFuture = consumersExecutor.submit(() -> { Future<?> packSubmitFuture = consumersExecutor.submit(() -> {
pendingMap.forEach((id, msg) -> { orderedMsgList.forEach((element) -> {
UUID id = element.getUuid();
TbProtoQueueMsg<ToCoreMsg> msg = element.getMsg();
log.trace("[{}] Creating main callback for message: {}", id, msg.getValue()); log.trace("[{}] Creating main callback for message: {}", id, msg.getValue());
TbCallback callback = new TbPackCallback<>(id, ctx); TbCallback callback = new TbPackCallback<>(id, ctx);
try { try {
@ -223,7 +228,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasEdgeNotificationMsg()) { } else if (toCoreMsg.hasEdgeNotificationMsg()) {
log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg()); log.trace("[{}] Forwarding message to edge service {}", id, toCoreMsg.getEdgeNotificationMsg());
forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback); forwardToEdgeNotificationService(toCoreMsg.getEdgeNotificationMsg(), callback);
} else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) { } else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) { if (actorMsg.isPresent()) {
TbActorMsg tbActorMsg = actorMsg.get(); TbActorMsg tbActorMsg = actorMsg.get();

View File

@ -27,7 +27,7 @@ import java.util.stream.Collectors;
public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngineSubmitStrategy { public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngineSubmitStrategy {
protected final String queueName; protected final String queueName;
protected List<IdMsgPair> orderedMsgList; protected List<IdMsgPair<TransportProtos.ToRuleEngineMsg>> orderedMsgList;
private volatile boolean stopped; private volatile boolean stopped;
public AbstractTbRuleEngineSubmitStrategy(String queueName) { public AbstractTbRuleEngineSubmitStrategy(String queueName) {
@ -38,7 +38,7 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine
@Override @Override
public void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs) { public void init(List<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgs) {
orderedMsgList = msgs.stream().map(msg -> new IdMsgPair(UUID.randomUUID(), msg)).collect(Collectors.toList()); orderedMsgList = msgs.stream().map(msg -> new IdMsgPair<>(UUID.randomUUID(), msg)).collect(Collectors.toList());
} }
@Override @Override
@ -48,8 +48,8 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine
@Override @Override
public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) { public void update(ConcurrentMap<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> reprocessMap) {
List<IdMsgPair> newOrderedMsgList = new ArrayList<>(reprocessMap.size()); List<IdMsgPair<TransportProtos.ToRuleEngineMsg>> newOrderedMsgList = new ArrayList<>(reprocessMap.size());
for (IdMsgPair pair : orderedMsgList) { for (IdMsgPair<TransportProtos.ToRuleEngineMsg> pair : orderedMsgList) {
if (reprocessMap.containsKey(pair.uuid)) { if (reprocessMap.containsKey(pair.uuid)) {
newOrderedMsgList.add(pair); newOrderedMsgList.add(pair);
} }

View File

@ -15,16 +15,18 @@
*/ */
package org.thingsboard.server.service.queue.processing; package org.thingsboard.server.service.queue.processing;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import lombok.Getter;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import java.util.UUID; import java.util.UUID;
public class IdMsgPair { public class IdMsgPair<T extends com.google.protobuf.GeneratedMessageV3> {
@Getter
final UUID uuid; final UUID uuid;
final TbProtoQueueMsg<ToRuleEngineMsg> msg; @Getter
final TbProtoQueueMsg<T> msg;
public IdMsgPair(UUID uuid, TbProtoQueueMsg<ToRuleEngineMsg> msg) { public IdMsgPair(UUID uuid, TbProtoQueueMsg<T> msg) {
this.uuid = uuid; this.uuid = uuid;
this.msg = msg; this.msg = msg;
} }

View File

@ -33,7 +33,7 @@ public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends Abs
private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer; private volatile BiConsumer<UUID, TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> msgConsumer;
private volatile ConcurrentMap<UUID, EntityId> msgToEntityIdMap = new ConcurrentHashMap<>(); private volatile ConcurrentMap<UUID, EntityId> msgToEntityIdMap = new ConcurrentHashMap<>();
private volatile ConcurrentMap<EntityId, Queue<IdMsgPair>> entityIdToListMap = new ConcurrentHashMap<>(); private volatile ConcurrentMap<EntityId, Queue<IdMsgPair<TransportProtos.ToRuleEngineMsg>>> entityIdToListMap = new ConcurrentHashMap<>();
public SequentialByEntityIdTbRuleEngineSubmitStrategy(String queueName) { public SequentialByEntityIdTbRuleEngineSubmitStrategy(String queueName) {
super(queueName); super(queueName);
@ -66,7 +66,7 @@ public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends Abs
protected void doOnSuccess(UUID id) { protected void doOnSuccess(UUID id) {
EntityId entityId = msgToEntityIdMap.get(id); EntityId entityId = msgToEntityIdMap.get(id);
if (entityId != null) { if (entityId != null) {
Queue<IdMsgPair> queue = entityIdToListMap.get(entityId); Queue<IdMsgPair<TransportProtos.ToRuleEngineMsg>> queue = entityIdToListMap.get(entityId);
if (queue != null) { if (queue != null) {
IdMsgPair next = null; IdMsgPair next = null;
synchronized (queue) { synchronized (queue) {
@ -86,7 +86,7 @@ public abstract class SequentialByEntityIdTbRuleEngineSubmitStrategy extends Abs
private void initMaps() { private void initMaps() {
msgToEntityIdMap.clear(); msgToEntityIdMap.clear();
entityIdToListMap.clear(); entityIdToListMap.clear();
for (IdMsgPair pair : orderedMsgList) { for (IdMsgPair<TransportProtos.ToRuleEngineMsg> pair : orderedMsgList) {
EntityId entityId = getEntityId(pair.msg.getValue()); EntityId entityId = getEntityId(pair.msg.getValue());
if (entityId != null) { if (entityId != null) {
msgToEntityIdMap.put(pair.uuid, entityId); msgToEntityIdMap.put(pair.uuid, entityId);

View File

@ -182,9 +182,13 @@ public class DefaultLwM2MTransportMsgHandler implements LwM2mTransportMsgHandler
SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(lwM2MClient); SessionInfoProto sessionInfo = this.getSessionInfoOrCloseSession(lwM2MClient);
if (sessionInfo != null) { if (sessionInfo != null) {
transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo)); transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(this, sessionInfo));
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), null); TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder()
transportService.process(sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build(), null); .setSessionInfo(sessionInfo)
transportService.process(sessionInfo, TransportProtos.SubscribeToRPCMsg.newBuilder().build(), null); .setSessionEvent(DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN))
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build())
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().build())
.build();
transportService.process(msg, null);
this.getInfoFirmwareUpdate(lwM2MClient); this.getInfoFirmwareUpdate(lwM2MClient);
this.getInfoSoftwareUpdate(lwM2MClient); this.getInfoSoftwareUpdate(lwM2MClient);
this.initLwM2mFromClientValue(registration, lwM2MClient); this.initLwM2mFromClientValue(registration, lwM2MClient);

View File

@ -256,9 +256,12 @@ public class GatewaySessionHandler {
log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType); log.trace("[{}] First got or created device [{}], type [{}] for the gateway session", sessionId, deviceName, deviceType);
SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo(); SessionInfoProto deviceSessionInfo = deviceSessionCtx.getSessionInfo();
transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx);
transportService.process(deviceSessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder()
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), null); .setSessionInfo(deviceSessionInfo)
transportService.process(deviceSessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), null); .setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN))
.setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().build())
.setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().build())
.build(), null);
} }
futureToSet.set(devices.get(deviceName)); futureToSet.set(devices.get(deviceName));
deviceFutures.remove(deviceName); deviceFutures.remove(deviceName);

View File

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse;
import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse;
import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetDeviceCredentialsRequestMsg;
@ -112,6 +113,8 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback); void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);
void process(TransportToDeviceActorMsg msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfoProto, GetFirmwareRequestMsg msg, TransportServiceCallback<GetFirmwareResponseMsg> callback); void process(SessionInfoProto sessionInfoProto, GetFirmwareRequestMsg msg, TransportServiceCallback<GetFirmwareResponseMsg> callback);
SessionMetaData registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener); SessionMetaData registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener);

View File

@ -456,6 +456,15 @@ public class DefaultTransportService implements TransportService {
} }
} }
@Override
public void process(TransportToDeviceActorMsg msg, TransportServiceCallback<Void> callback) {
TransportProtos.SessionInfoProto sessionInfo = msg.getSessionInfo();
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
sendToDeviceActor(sessionInfo, msg, callback);
}
}
@Override @Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
int dataPoints = 0; int dataPoints = 0;