Moved edge rpc processing logic from tenant actor to edge service. Execute requests to edge service in a separate executor service

This commit is contained in:
Volodymyr Babak 2022-09-06 16:24:18 +03:00
parent faaf07b1ea
commit 32fcfdac92
8 changed files with 97 additions and 71 deletions

View File

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
@ -106,7 +107,7 @@ public class AppActor extends ContextAwareActor {
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToTenantActorMsg((TenantAwareMsg) msg);
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
@ -194,7 +195,7 @@ public class AppActor extends ContextAwareActor {
() -> new TenantActor.ActorCreator(systemContext, tenantId));
}
private void onToTenantActorMsg(TenantAwareMsg msg) {
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
TbActorRef target = null;
if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
log.warn("Message has system tenant id: {}", msg);
@ -204,7 +205,7 @@ public class AppActor extends ContextAwareActor {
if (target != null) {
target.tellWithHighPriority(msg);
} else {
log.debug("[{}] Invalid edge event update msg: {}", msg.getTenantId(), msg);
log.debug("[{}] Invalid edge session msg: {}", msg.getTenantId(), msg);
}
}

View File

@ -47,9 +47,7 @@ import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -171,7 +169,7 @@ public class TenantActor extends RuleChainManagerActor {
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg(msg);
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
default:
return false;
@ -275,24 +273,8 @@ public class TenantActor extends RuleChainManagerActor {
() -> new DeviceActorCreator(systemContext, tenantId, deviceId));
}
private void onToEdgeSessionMsg(TbActorMsg msg) {
switch (msg.getMsgType()) {
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg;
log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg);
systemContext.getEdgeRpcService().onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId());
break;
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg;
log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg);
systemContext.getEdgeRpcService().startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId());
break;
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg;
log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg);
systemContext.getEdgeRpcService().processSyncResponse(fromEdgeSyncResponse);
break;
}
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
systemContext.getEdgeRpcService().onToEdgeSessionMsg(tenantId, msg);
}
private ApiUsageState getApiUsageState() {

View File

@ -35,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.gen.edge.v1.EdgeRpcServiceGrpc;
@ -113,7 +115,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private ScheduledExecutorService sendDownlinkExecutorService;
private ScheduledExecutorService syncScheduler;
private ScheduledExecutorService executorService;
@PostConstruct
public void init() {
@ -140,9 +142,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.error("Failed to start Edge RPC server!", e);
throw new RuntimeException("Failed to start Edge RPC server!");
}
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-check-scheduler"));
this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler"));
this.syncScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-sync-scheduler"));
this.executorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-service"));
log.info("Edge RPC service initialized!");
}
@ -165,6 +167,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (sendDownlinkExecutorService != null) {
sendDownlinkExecutorService.shutdownNow();
}
if (executorService != null) {
executorService.shutdownNow();
}
}
@Override
@ -172,8 +177,32 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService).getInputStream();
}
@Override
public void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg) {
executorService.execute(() -> {
switch (msg.getMsgType()) {
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
EdgeEventUpdateMsg edgeEventUpdateMsg = (EdgeEventUpdateMsg) msg;
log.trace("[{}] onToEdgeSessionMsg [{}]", edgeEventUpdateMsg.getTenantId(), msg);
onEdgeEvent(tenantId, edgeEventUpdateMsg.getEdgeId());
break;
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
ToEdgeSyncRequest toEdgeSyncRequest = (ToEdgeSyncRequest) msg;
log.trace("[{}] toEdgeSyncRequest [{}]", toEdgeSyncRequest.getTenantId(), msg);
startSyncProcess(tenantId, toEdgeSyncRequest.getEdgeId(), toEdgeSyncRequest.getId());
break;
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
FromEdgeSyncResponse fromEdgeSyncResponse = (FromEdgeSyncResponse) msg;
log.trace("[{}] fromEdgeSyncResponse [{}]", fromEdgeSyncResponse.getTenantId(), msg);
processSyncResponse(fromEdgeSyncResponse);
break;
}
});
}
@Override
public void updateEdge(TenantId tenantId, Edge edge) {
executorService.execute(() -> {
EdgeGrpcSession session = sessions.get(edge.getId());
if (session != null && session.isConnected()) {
log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
@ -181,10 +210,12 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
} else {
log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
}
});
}
@Override
public void deleteEdge(TenantId tenantId, EdgeId edgeId) {
executorService.execute(() -> {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
@ -199,10 +230,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
cancelScheduleEdgeEventsCheck(edgeId);
}
});
}
@Override
public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
private void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
@ -235,8 +266,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
scheduleEdgeEventsCheck(edgeGrpcSession);
}
@Override
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) {
private void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null) {
boolean success = false;
@ -259,7 +289,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
log.trace("[{}] scheduling sync edge request", requestId);
syncScheduler.schedule(() -> {
executorService.schedule(() -> {
log.trace("[{}] checking if sync edge request is not processed...", requestId);
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
if (consumer != null) {
@ -269,8 +299,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}, 10, TimeUnit.SECONDS);
}
@Override
public void processSyncResponse(FromEdgeSyncResponse response) {
private void processSyncResponse(FromEdgeSyncResponse response) {
log.trace("[{}] Received response from sync service: [{}]", response.getId(), response);
UUID requestId = response.getId();
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);

View File

@ -18,23 +18,19 @@ package org.thingsboard.server.service.edge.rpc;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import java.util.UUID;
import java.util.function.Consumer;
public interface EdgeRpcService {
void onToEdgeSessionMsg(TenantId tenantId, EdgeSessionMsg msg);
void updateEdge(TenantId tenantId, Edge edge);
void deleteEdge(TenantId tenantId, EdgeId edgeId);
void onEdgeEvent(TenantId tenantId, EdgeId edgeId);
void startSyncProcess(TenantId tenantId, EdgeId edgeId, UUID requestId);
void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer);
void processSyncResponse(FromEdgeSyncResponse response);
}

View File

@ -20,11 +20,9 @@ import lombok.ToString;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
@ToString
public class EdgeEventUpdateMsg implements TenantAwareMsg, ToAllNodesMsg {
public class EdgeEventUpdateMsg implements EdgeSessionMsg {
@Getter
private final TenantId tenantId;
@Getter

View File

@ -0,0 +1,24 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.edge;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import java.io.Serializable;
public interface EdgeSessionMsg extends TenantAwareMsg, ToAllNodesMsg {
}

View File

@ -20,14 +20,12 @@ import lombok.Getter;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import java.util.UUID;
@AllArgsConstructor
@Getter
public class FromEdgeSyncResponse implements TenantAwareMsg, ToAllNodesMsg {
public class FromEdgeSyncResponse implements EdgeSessionMsg {
private final UUID id;
private final TenantId tenantId;

View File

@ -20,14 +20,12 @@ import lombok.Getter;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
import java.util.UUID;
@AllArgsConstructor
@Getter
public class ToEdgeSyncRequest implements TenantAwareMsg, ToAllNodesMsg {
public class ToEdgeSyncRequest implements EdgeSessionMsg {
private final UUID id;
private final TenantId tenantId;
private final EdgeId edgeId;