Edge events are processed in a separate threads to avoid blocking core consumer threads

This commit is contained in:
Volodymyr Babak 2023-10-16 12:54:10 +03:00
parent 94e935636e
commit fb5c593bb5
2 changed files with 92 additions and 80 deletions

View File

@ -22,10 +22,11 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventType;
@ -59,7 +60,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Service
@TbCoreComponent
@ -128,17 +129,20 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
@Autowired
protected ApplicationEventPublisher eventPublisher;
private ExecutorService dbCallBackExecutor;
@Value("${actors.system.edge_dispatcher_pool_size:4}")
private int edgeDispatcherSize;
private ExecutorService executor;
@PostConstruct
public void initExecutor() {
dbCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications"));
executor = ThingsBoardExecutors.newWorkStealingPool(edgeDispatcherSize, "edge-notifications");
}
@PreDestroy
public void shutdownExecutor() {
if (dbCallBackExecutor != null) {
dbCallBackExecutor.shutdownNow();
if (executor != null) {
executor.shutdownNow();
}
}
@ -157,82 +161,89 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
TenantId tenantId = TenantId.fromUUID(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
log.debug("[{}] Pushing notification to edge {}", tenantId, edgeNotificationMsg);
try {
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
ListenableFuture<Void> future;
switch (type) {
case EDGE:
future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg);
break;
case ASSET:
future = assetProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case DEVICE:
future = deviceProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ENTITY_VIEW:
future = entityViewProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case DASHBOARD:
future = dashboardProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case RULE_CHAIN:
future = ruleChainProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case USER:
future = userProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case CUSTOMER:
future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg);
break;
case DEVICE_PROFILE:
future = deviceProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ASSET_PROFILE:
future = assetProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case OTA_PACKAGE:
future = otaPackageProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case WIDGETS_BUNDLE:
future = widgetBundleProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case WIDGET_TYPE:
future = widgetTypeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case QUEUE:
future = queueProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ALARM:
future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg);
break;
case RELATION:
future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg);
break;
case TENANT:
future = tenantEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case TENANT_PROFILE:
future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
default:
log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type);
future = Futures.immediateFuture(null);
}
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void unused) {
callback.onSuccess();
final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
executor.submit(() -> {
try {
if (deadline < System.nanoTime()) {
log.warn("[{}] Skipping notification message because deadline reached {}", tenantId, edgeNotificationMsg);
return;
}
EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType());
ListenableFuture<Void> future;
switch (type) {
case EDGE:
future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg);
break;
case ASSET:
future = assetProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case DEVICE:
future = deviceProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ENTITY_VIEW:
future = entityViewProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case DASHBOARD:
future = dashboardProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case RULE_CHAIN:
future = ruleChainProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case USER:
future = userProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case CUSTOMER:
future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg);
break;
case DEVICE_PROFILE:
future = deviceProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ASSET_PROFILE:
future = assetProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case OTA_PACKAGE:
future = otaPackageProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case WIDGETS_BUNDLE:
future = widgetBundleProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case WIDGET_TYPE:
future = widgetTypeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case QUEUE:
future = queueProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case ALARM:
future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg);
break;
case RELATION:
future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg);
break;
case TENANT:
future = tenantEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
case TENANT_PROFILE:
future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
default:
log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type);
future = Futures.immediateFuture(null);
}
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void unused) {
callback.onSuccess();
}
@Override
public void onFailure(Throwable throwable) {
callBackFailure(tenantId, edgeNotificationMsg, callback, throwable);
}
}, dbCallBackExecutor);
} catch (Exception e) {
callBackFailure(tenantId, edgeNotificationMsg, callback, e);
}
@Override
public void onFailure(Throwable throwable) {
callBackFailure(tenantId, edgeNotificationMsg, callback, throwable);
}
}, executor);
} catch (Exception e) {
callBackFailure(tenantId, edgeNotificationMsg, callback, e);
}
});
}
private void callBackFailure(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback, Throwable throwable) {

View File

@ -364,6 +364,7 @@ actors:
tenant_dispatcher_pool_size: "${ACTORS_SYSTEM_TENANT_DISPATCHER_POOL_SIZE:2}"
device_dispatcher_pool_size: "${ACTORS_SYSTEM_DEVICE_DISPATCHER_POOL_SIZE:4}"
rule_dispatcher_pool_size: "${ACTORS_SYSTEM_RULE_DISPATCHER_POOL_SIZE:8}"
edge_dispatcher_pool_size: "${ACTORS_SYSTEM_EDGE_DISPATCHER_POOL_SIZE:4}"
tenant:
create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}"
session: