EDQS: introduce API auto-enable option
This commit is contained in:
		
							parent
							
								
									ddc9f6f8c7
								
							
						
					
					
						commit
						32a4a7ce35
					
				@ -22,6 +22,7 @@ import jakarta.annotation.PostConstruct;
 | 
				
			|||||||
import jakarta.annotation.PreDestroy;
 | 
					import jakarta.annotation.PreDestroy;
 | 
				
			||||||
import lombok.RequiredArgsConstructor;
 | 
					import lombok.RequiredArgsConstructor;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
 | 
					import org.springframework.beans.factory.annotation.Value;
 | 
				
			||||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
					import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
 | 
				
			||||||
import org.springframework.stereotype.Service;
 | 
					import org.springframework.stereotype.Service;
 | 
				
			||||||
import org.thingsboard.common.util.JacksonUtil;
 | 
					import org.thingsboard.common.util.JacksonUtil;
 | 
				
			||||||
@ -43,13 +44,16 @@ import java.util.UUID;
 | 
				
			|||||||
@Service
 | 
					@Service
 | 
				
			||||||
@Slf4j
 | 
					@Slf4j
 | 
				
			||||||
@RequiredArgsConstructor
 | 
					@RequiredArgsConstructor
 | 
				
			||||||
@ConditionalOnExpression("'${queue.edqs.api_enabled:true}' == 'true' && ('${service.type:null}' == 'monolith' || '${service.type:null}' == 'tb-core')")
 | 
					@ConditionalOnExpression("'${queue.edqs.api.supported:true}' == 'true' && ('${service.type:null}' == 'monolith' || '${service.type:null}' == 'tb-core')")
 | 
				
			||||||
public class DefaultEdqsApiService implements EdqsApiService {
 | 
					public class DefaultEdqsApiService implements EdqsApiService {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final EdqsPartitionService edqsPartitionService;
 | 
					    private final EdqsPartitionService edqsPartitionService;
 | 
				
			||||||
    private final EdqsClientQueueFactory queueFactory;
 | 
					    private final EdqsClientQueueFactory queueFactory;
 | 
				
			||||||
    private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate;
 | 
					    private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Value("${queue.edqs.api.auto_enable:true}")
 | 
				
			||||||
 | 
					    private boolean autoEnable;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private Boolean apiEnabled = null;
 | 
					    private Boolean apiEnabled = null;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @PostConstruct
 | 
					    @PostConstruct
 | 
				
			||||||
@ -100,6 +104,11 @@ public class DefaultEdqsApiService implements EdqsApiService {
 | 
				
			|||||||
        return true;
 | 
					        return true;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public boolean isAutoEnable() {
 | 
				
			||||||
 | 
					        return autoEnable;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @PreDestroy
 | 
					    @PreDestroy
 | 
				
			||||||
    private void stop() {
 | 
					    private void stop() {
 | 
				
			||||||
        requestTemplate.stop();
 | 
					        requestTemplate.stop();
 | 
				
			||||||
 | 
				
			|||||||
@ -118,7 +118,7 @@ public class DefaultEdqsService implements EdqsService {
 | 
				
			|||||||
                                .syncRequest(new EdqsSyncRequest())
 | 
					                                .syncRequest(new EdqsSyncRequest())
 | 
				
			||||||
                                .build());
 | 
					                                .build());
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                } else if (edqsApiService.isSupported()) {
 | 
					                } else if (edqsApiService.isSupported() && edqsApiService.isAutoEnable()) {
 | 
				
			||||||
                    // only if topic/RocksDB is not empty and sync is finished
 | 
					                    // only if topic/RocksDB is not empty and sync is finished
 | 
				
			||||||
                    edqsApiService.setEnabled(true);
 | 
					                    edqsApiService.setEnabled(true);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
@ -159,11 +159,15 @@ public class DefaultEdqsService implements EdqsService {
 | 
				
			|||||||
                        edqsSyncService.sync();
 | 
					                        edqsSyncService.sync();
 | 
				
			||||||
                        saveSyncState(EdqsSyncStatus.FINISHED);
 | 
					                        saveSyncState(EdqsSyncStatus.FINISHED);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        if (edqsApiService.isSupported()) {
 | 
					                        if (edqsApiService.isSupported())
 | 
				
			||||||
                            broadcast(ToCoreEdqsMsg.builder()
 | 
					                            if (edqsApiService.isAutoEnable()) {
 | 
				
			||||||
                                    .apiEnabled(Boolean.TRUE)
 | 
					                                log.info("EDQS sync is finished, auto-enabling API");
 | 
				
			||||||
                                    .build());
 | 
					                                broadcast(ToCoreEdqsMsg.builder()
 | 
				
			||||||
                        }
 | 
					                                        .apiEnabled(Boolean.TRUE)
 | 
				
			||||||
 | 
					                                        .build());
 | 
				
			||||||
 | 
					                            } else {
 | 
				
			||||||
 | 
					                                log.info("EDQS sync is finished, but leaving API disabled");
 | 
				
			||||||
 | 
					                            }
 | 
				
			||||||
                    } catch (Exception e) {
 | 
					                    } catch (Exception e) {
 | 
				
			||||||
                        log.error("Failed to complete sync", e);
 | 
					                        log.error("Failed to complete sync", e);
 | 
				
			||||||
                        saveSyncState(EdqsSyncStatus.FAILED);
 | 
					                        saveSyncState(EdqsSyncStatus.FAILED);
 | 
				
			||||||
 | 
				
			|||||||
@ -1731,8 +1731,11 @@ queue:
 | 
				
			|||||||
      entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}"
 | 
					      entity_batch_size: "${TB_EDQS_SYNC_ENTITY_BATCH_SIZE:10000}"
 | 
				
			||||||
      # Batch size of timeseries data being synced with EDQS
 | 
					      # Batch size of timeseries data being synced with EDQS
 | 
				
			||||||
      ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}"
 | 
					      ts_batch_size: "${TB_EDQS_SYNC_TS_BATCH_SIZE:10000}"
 | 
				
			||||||
    # Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation)
 | 
					    api:
 | 
				
			||||||
    api_enabled: "${TB_EDQS_API_ENABLED:false}"
 | 
					      # Whether to forward entity data query requests to EDQS (otherwise use PostgreSQL implementation)
 | 
				
			||||||
 | 
					      supported: "${TB_EDQS_API_SUPPORTED:false}"
 | 
				
			||||||
 | 
					      # Whether to auto-enable EDQS API (if queue.edqs.api.supported is true) when sync of data to Kafka is finished 
 | 
				
			||||||
 | 
					      auto_enable: "${TB_EDQS_API_AUTO_ENABLE:true}"
 | 
				
			||||||
    # Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices)
 | 
					    # Mode of EDQS: local (for monolith) or remote (with separate EDQS microservices)
 | 
				
			||||||
    mode: "${TB_EDQS_MODE:local}"
 | 
					    mode: "${TB_EDQS_MODE:local}"
 | 
				
			||||||
    local:
 | 
					    local:
 | 
				
			||||||
 | 
				
			|||||||
@ -37,7 +37,8 @@ import static org.awaitility.Awaitility.await;
 | 
				
			|||||||
//        "queue.type=kafka", // uncomment to use Kafka
 | 
					//        "queue.type=kafka", // uncomment to use Kafka
 | 
				
			||||||
//        "queue.kafka.bootstrap.servers=10.7.1.254:9092",
 | 
					//        "queue.kafka.bootstrap.servers=10.7.1.254:9092",
 | 
				
			||||||
        "queue.edqs.sync.enabled=true",
 | 
					        "queue.edqs.sync.enabled=true",
 | 
				
			||||||
        "queue.edqs.api_enabled=true",
 | 
					        "queue.edqs.api.supported=true",
 | 
				
			||||||
 | 
					        "queue.edqs.api.auto_enable=true",
 | 
				
			||||||
        "queue.edqs.mode=local"
 | 
					        "queue.edqs.mode=local"
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
 | 
					public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
 | 
				
			||||||
 | 
				
			|||||||
@ -51,7 +51,8 @@ import static org.awaitility.Awaitility.await;
 | 
				
			|||||||
@DaoSqlTest
 | 
					@DaoSqlTest
 | 
				
			||||||
@TestPropertySource(properties = {
 | 
					@TestPropertySource(properties = {
 | 
				
			||||||
        "queue.edqs.sync.enabled=true",
 | 
					        "queue.edqs.sync.enabled=true",
 | 
				
			||||||
        "queue.edqs.api_enabled=true",
 | 
					        "queue.edqs.api.supported=true",
 | 
				
			||||||
 | 
					        "queue.edqs.api.auto_enable=true",
 | 
				
			||||||
        "queue.edqs.mode=local"
 | 
					        "queue.edqs.mode=local"
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
public class EdqsEntityServiceTest extends EntityServiceTest {
 | 
					public class EdqsEntityServiceTest extends EntityServiceTest {
 | 
				
			||||||
 | 
				
			|||||||
@ -60,4 +60,4 @@ server.log_controller_error_stack_trace=false
 | 
				
			|||||||
transport.gateway.dashboard.sync.enabled=false
 | 
					transport.gateway.dashboard.sync.enabled=false
 | 
				
			||||||
 | 
					
 | 
				
			||||||
queue.edqs.sync.enabled=false
 | 
					queue.edqs.sync.enabled=false
 | 
				
			||||||
queue.edqs.api_enabled=false
 | 
					queue.edqs.api.supported=false
 | 
				
			||||||
 | 
				
			|||||||
@ -31,4 +31,6 @@ public interface EdqsApiService {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    boolean isSupported();
 | 
					    boolean isSupported();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    boolean isAutoEnable();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -50,4 +50,9 @@ public class DummyEdqsApiService implements EdqsApiService {
 | 
				
			|||||||
        return false;
 | 
					        return false;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Override
 | 
				
			||||||
 | 
					    public boolean isAutoEnable() {
 | 
				
			||||||
 | 
					        return false;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -2,4 +2,4 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
TB_EDQS_MODE=remote
 | 
					TB_EDQS_MODE=remote
 | 
				
			||||||
TB_EDQS_SYNC_ENABLED=true
 | 
					TB_EDQS_SYNC_ENABLED=true
 | 
				
			||||||
TB_EDQS_API_ENABLED=true
 | 
					TB_EDQS_API_SUPPORTED=true
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user