Improved Edge handling in case connect/disconnect. Speed up initial setup

This commit is contained in:
Volodymyr Babak 2025-02-10 18:42:25 +02:00
parent 3eaa885f89
commit 3860d79613
8 changed files with 43 additions and 24 deletions

View File

@ -43,19 +43,6 @@ volumes:
Execute the following command to start upgrade process:
```bash
docker compose -f docker-compose-upgrade.yml up
docker compose -f docker-compose-upgrade.yml up --abort-on-container-exit
{:copy-code}
```
Once upgrade process successfully completed, exit from the docker-compose shell by this combination:
```text
Ctrl + C
```
Execute the following command to stop TB Edge upgrade container:
```bash
docker compose -f docker-compose-upgrade.yml stop
{:copy-code}
```
```

View File

@ -404,6 +404,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void scheduleEdgeEventsCheck(EdgeGrpcSession session) {
EdgeId edgeId = session.getEdge().getId();
TenantId tenantId = session.getEdge().getTenantId();
cancelScheduleEdgeEventsCheck(edgeId);
if (sessions.containsKey(edgeId)) {
ScheduledFuture<?> edgeEventCheckTask = edgeEventProcessingExecutorService.schedule(() -> {
try {

View File

@ -448,7 +448,11 @@ public abstract class EdgeGrpcSession implements Closeable {
private void scheduleDownlinkMsgsPackSend(int attempt) {
Runnable sendDownlinkMsgsTask = () -> {
try {
if (isConnected() && !sessionState.getPendingMsgsMap().values().isEmpty()) {
if (!isConnected()) {
stopCurrentSendDownlinkMsgsTask(true);
return;
}
if (!sessionState.getPendingMsgsMap().values().isEmpty()) {
List<DownlinkMsg> copy = new ArrayList<>(sessionState.getPendingMsgsMap().values());
if (attempt > 1) {
String error = "Failed to deliver the batch";
@ -525,11 +529,11 @@ public abstract class EdgeGrpcSession implements Closeable {
private void onDownlinkResponse(DownlinkResponseMsg msg) {
try {
if (msg.getSuccess()) {
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
log.debug("[{}][{}][{}] Msg has been processed successfully! Msg Id: [{}], Msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg);
} else {
log.error("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", tenantId, edge.getId(), sessionId, msg.getDownlinkMsgId(), msg.getErrorMsg());
}
sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
if (sessionState.getPendingMsgsMap().isEmpty()) {
log.debug("[{}][{}][{}] Pending msgs map is empty. Stopping current iteration", tenantId, edge.getId(), sessionId);
stopCurrentSendDownlinkMsgsTask(false);

View File

@ -41,7 +41,10 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor {
if (dashboard == null) {
throw new RuntimeException("[{" + tenantId + "}] dashboardUpdateMsg {" + dashboardUpdateMsg + "} cannot be converted to dashboard");
}
Set<ShortCustomerInfo> newAssignedCustomers = new HashSet<>(dashboard.getAssignedCustomers());
Set<ShortCustomerInfo> newAssignedCustomers = new HashSet<>();
if (dashboard.getAssignedCustomers() != null && !dashboard.getAssignedCustomers().isEmpty()) {
newAssignedCustomers.addAll(dashboard.getAssignedCustomers());
}
Dashboard dashboardById = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId);
if (dashboardById == null) {
created = true;

View File

@ -28,17 +28,22 @@ import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
public interface EdgeRequestsService {
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg);
ListenableFuture<Void> processAttributesRequestMsg(TenantId tenantId, Edge edge, AttributesRequestMsg attributesRequestMsg);
ListenableFuture<Void> processRelationRequestMsg(TenantId tenantId, Edge edge, RelationRequestMsg relationRequestMsg);
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg);
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg);
@Deprecated(since = "3.9.1", forRemoval = true)
ListenableFuture<Void> processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg);
}

View File

@ -48,12 +48,14 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
private static final int MOBILE_ORDER = 5;
private static final String IMAGE = "";
private static final String DASHBOARD_TITLE = "Edge Test Dashboard";
@Test
public void testDashboards() throws Exception {
// create dashboard and assign to edge
edgeImitator.expectMessageAmount(2);
Dashboard dashboard = new Dashboard();
dashboard.setTitle("Edge Test Dashboard");
dashboard.setTitle(DASHBOARD_TITLE);
dashboard.setMobileHide(true);
dashboard.setImage(IMAGE);
dashboard.setMobileOrder(MOBILE_ORDER);
@ -200,12 +202,27 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
Dashboard foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class);
Assert.assertNotNull(foundDashboard);
Assert.assertEquals("Edge Test Dashboard", foundDashboard.getName());
Assert.assertEquals(DASHBOARD_TITLE, foundDashboard.getName());
PageData<DashboardInfo> pageData = doGetTypedWithPageLink("/api/customer/" + savedCustomer.getId().toString() + "/dashboards?",
new TypeReference<>() {}, new PageLink(100));
Assert.assertEquals(1, pageData.getData().size());
Assert.assertEquals("Edge Test Dashboard", pageData.getData().get(0).getTitle());
Assert.assertEquals(DASHBOARD_TITLE, pageData.getData().get(0).getTitle());
dashboard.setTitle(DASHBOARD_TITLE + " Updated");
dashboard.setAssignedCustomers(null);
dashboardUpdateMsgBuilder.setEntity(JacksonUtil.toString(dashboard));
dashboardUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE);
uplinkMsgBuilder = UplinkMsg.newBuilder();
uplinkMsgBuilder.addDashboardUpdateMsg(dashboardUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
Assert.assertTrue(edgeImitator.waitForResponses());
foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class);
Assert.assertEquals(DASHBOARD_TITLE + " Updated", foundDashboard.getName());
}
@Test
@ -256,7 +273,7 @@ public class DashboardEdgeTest extends AbstractEdgeTest {
Dashboard dashboard = new Dashboard();
dashboard.setId(new DashboardId(UUID.randomUUID()));
dashboard.setTenantId(tenantId);
dashboard.setTitle("Edge Test Dashboard");
dashboard.setTitle(DASHBOARD_TITLE);
dashboard.setAssignedCustomers(Sets.newHashSet(new ShortCustomerInfo(savedCustomer.getId(), savedCustomer.getTitle(), savedCustomer.isPublic())));
return dashboard;
}

View File

@ -472,7 +472,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("monolith-to-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin);

View File

@ -421,7 +421,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId());
consumerBuilder.clientId("tb-core-edge-event-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(edgeEventAdmin);