Implement clean up service to clear empty edge topics
This commit is contained in:
parent
3ecbd3258e
commit
38eeada8ec
@ -20,6 +20,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
|
|||||||
import jakarta.annotation.PostConstruct;
|
import jakarta.annotation.PostConstruct;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.event.TransactionalEventListener;
|
import org.springframework.transaction.event.TransactionalEventListener;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
@ -37,6 +39,7 @@ import org.thingsboard.server.common.data.domain.Domain;
|
|||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||||
|
import org.thingsboard.server.common.data.id.EdgeId;
|
||||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||||
@ -50,6 +53,8 @@ import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
|
|||||||
import org.thingsboard.server.dao.eventsourcing.RelationActionEvent;
|
import org.thingsboard.server.dao.eventsourcing.RelationActionEvent;
|
||||||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
||||||
import org.thingsboard.server.dao.tenant.TenantService;
|
import org.thingsboard.server.dao.tenant.TenantService;
|
||||||
|
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||||
|
import org.thingsboard.server.queue.discovery.TopicService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This event listener does not support async event processing because relay on ThreadLocal
|
* This event listener does not support async event processing because relay on ThreadLocal
|
||||||
@ -65,14 +70,20 @@ import org.thingsboard.server.dao.tenant.TenantService;
|
|||||||
* future.addCallback(eventPublisher.publishEvent(...))
|
* future.addCallback(eventPublisher.publishEvent(...))
|
||||||
* }
|
* }
|
||||||
* */
|
* */
|
||||||
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@RequiredArgsConstructor
|
@RequiredArgsConstructor
|
||||||
@Slf4j
|
@ConditionalOnExpression("${edges.enabled:true}")
|
||||||
public class EdgeEventSourcingListener {
|
public class EdgeEventSourcingListener {
|
||||||
|
|
||||||
|
private final TopicService topicService;
|
||||||
|
private final TbQueueAdmin tbQueueAdmin;
|
||||||
|
private final TenantService tenantService;
|
||||||
private final TbClusterService tbClusterService;
|
private final TbClusterService tbClusterService;
|
||||||
private final EdgeSynchronizationManager edgeSynchronizationManager;
|
private final EdgeSynchronizationManager edgeSynchronizationManager;
|
||||||
private final TenantService tenantService;
|
|
||||||
|
@Value("#{'${queue.type:null}' == 'kafka'}")
|
||||||
|
private boolean isKafkaSupported;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
@ -106,7 +117,15 @@ public class EdgeEventSourcingListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (EntityType.EDGE.equals(entityType) || EntityType.TENANT.equals(entityType)) {
|
if (EntityType.EDGE.equals(entityType)) {
|
||||||
|
if (isKafkaSupported) {
|
||||||
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, (EdgeId) event.getEntityId()).getTopic();
|
||||||
|
tbQueueAdmin.deleteTopic(topic);
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (EntityType.TENANT.equals(entityType)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event);
|
log.trace("[{}] DeleteEntityEvent called: {}", tenantId, event);
|
||||||
|
|||||||
@ -102,40 +102,34 @@ import java.util.function.BiConsumer;
|
|||||||
@Data
|
@Data
|
||||||
public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<T>> implements EdgeGrpcSession, Closeable {
|
public abstract class AbstractEdgeGrpcSession<T extends AbstractEdgeGrpcSession<T>> implements EdgeGrpcSession, Closeable {
|
||||||
|
|
||||||
protected static final ReentrantLock downlinkMsgLock = new ReentrantLock();
|
private static final int MAX_DOWNLINK_ATTEMPTS = 10;
|
||||||
protected static final ConcurrentLinkedQueue<EdgeEvent> highPriorityQueue = new ConcurrentLinkedQueue<>();
|
private static final String RATE_LIMIT_REACHED = "Rate limit reached";
|
||||||
|
|
||||||
protected static final int MAX_DOWNLINK_ATTEMPTS = 10;
|
protected static final ConcurrentLinkedQueue<EdgeEvent> highPriorityQueue = new ConcurrentLinkedQueue<>();
|
||||||
protected static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
|
||||||
protected static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
|
|
||||||
protected static final String RATE_LIMIT_REACHED = "Rate limit reached";
|
|
||||||
|
|
||||||
protected UUID sessionId;
|
protected UUID sessionId;
|
||||||
protected BiConsumer<EdgeId, T> sessionOpenListener;
|
protected BiConsumer<EdgeId, T> sessionOpenListener;
|
||||||
protected BiConsumer<Edge, UUID> sessionCloseListener;
|
protected BiConsumer<Edge, UUID> sessionCloseListener;
|
||||||
|
|
||||||
protected final EdgeSessionState sessionState = new EdgeSessionState();
|
private final EdgeSessionState sessionState = new EdgeSessionState();
|
||||||
|
private final ReentrantLock downlinkMsgLock = new ReentrantLock();
|
||||||
|
|
||||||
protected EdgeContextComponent ctx;
|
protected EdgeContextComponent ctx;
|
||||||
protected Edge edge;
|
protected Edge edge;
|
||||||
protected TenantId tenantId;
|
protected TenantId tenantId;
|
||||||
|
|
||||||
protected StreamObserver<RequestMsg> inputStream;
|
protected StreamObserver<RequestMsg> inputStream;
|
||||||
protected StreamObserver<ResponseMsg> outputStream;
|
protected StreamObserver<ResponseMsg> outputStream;
|
||||||
|
|
||||||
protected boolean connected;
|
protected boolean connected;
|
||||||
protected volatile boolean syncCompleted;
|
protected volatile boolean syncCompleted;
|
||||||
|
|
||||||
protected Long newStartTs;
|
private EdgeVersion edgeVersion;
|
||||||
protected Long previousStartTs;
|
private int maxInboundMessageSize;
|
||||||
protected Long newStartSeqId;
|
private int clientMaxInboundMessageSize;
|
||||||
protected Long previousStartSeqId;
|
private int maxHighPriorityQueueSizePerSession;
|
||||||
protected Long seqIdEnd;
|
|
||||||
|
|
||||||
protected EdgeVersion edgeVersion;
|
private ScheduledExecutorService sendDownlinkExecutorService;
|
||||||
protected int maxInboundMessageSize;
|
|
||||||
protected int clientMaxInboundMessageSize;
|
|
||||||
protected int maxHighPriorityQueueSizePerSession;
|
|
||||||
|
|
||||||
protected ScheduledExecutorService sendDownlinkExecutorService;
|
|
||||||
|
|
||||||
public AbstractEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
|
public AbstractEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
|
||||||
BiConsumer<EdgeId, T> sessionOpenListener,
|
BiConsumer<EdgeId, T> sessionOpenListener,
|
||||||
|
|||||||
@ -122,7 +122,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
@Value("${edges.max_high_priority_queue_size_per_session:10000}")
|
@Value("${edges.max_high_priority_queue_size_per_session:10000}")
|
||||||
private int maxHighPriorityQueueSizePerSession;
|
private int maxHighPriorityQueueSizePerSession;
|
||||||
|
|
||||||
@Value("#{ '${queue.type:null}' == 'kafka' }")
|
@Value("#{'${queue.type:null}' == 'kafka'}")
|
||||||
private boolean isKafkaSupported;
|
private boolean isKafkaSupported;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -335,7 +335,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs);
|
save(tenantId, edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, lastConnectTs);
|
||||||
edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId());
|
edgeIdServiceIdCache.put(edgeId, serviceInfoProvider.getServiceId());
|
||||||
if (isKafkaSupported) {
|
if (isKafkaSupported) {
|
||||||
TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId, serviceInfoProvider.getServiceId());
|
TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer = tbCoreQueueFactory.createEdgeEventMsgConsumer(tenantId, edgeId);
|
||||||
((KafkaEdgeGrpcSession) edgeGrpcSession).initConsumer(consumer);
|
((KafkaEdgeGrpcSession) edgeGrpcSession).initConsumer(consumer);
|
||||||
}
|
}
|
||||||
pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT);
|
pushRuleEngineMessage(tenantId, edge, lastConnectTs, TbMsgType.CONNECT_EVENT);
|
||||||
|
|||||||
@ -23,7 +23,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
|||||||
import org.springframework.context.ApplicationEventPublisher;
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
|
||||||
import org.thingsboard.server.cache.limits.RateLimitService;
|
import org.thingsboard.server.cache.limits.RateLimitService;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
@ -38,12 +37,11 @@ import org.thingsboard.server.common.util.ProtoUtils;
|
|||||||
import org.thingsboard.server.dao.edge.EdgeEventService;
|
import org.thingsboard.server.dao.edge.EdgeEventService;
|
||||||
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
|
||||||
import org.thingsboard.server.dao.service.DataValidator;
|
import org.thingsboard.server.dao.service.DataValidator;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos.ToEdgeEventNotificationMsg;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
import org.thingsboard.server.queue.discovery.TopicService;
|
import org.thingsboard.server.queue.discovery.TopicService;
|
||||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -59,7 +57,6 @@ public class KafkaEdgeEventService implements EdgeEventService {
|
|||||||
private final TbQueueProducerProvider producerProvider;
|
private final TbQueueProducerProvider producerProvider;
|
||||||
@Lazy
|
@Lazy
|
||||||
private final TopicService topicService;
|
private final TopicService topicService;
|
||||||
private final TbTransactionalCache<EdgeId, String> edgeIdServiceIdCache;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
|
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
|
||||||
@ -70,9 +67,8 @@ public class KafkaEdgeEventService implements EdgeEventService {
|
|||||||
throw new TbRateLimitsException(EntityType.EDGE);
|
throw new TbRateLimitsException(EntityType.EDGE);
|
||||||
}
|
}
|
||||||
edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId);
|
edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId);
|
||||||
var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeEvent.getEdgeId()));
|
TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId());
|
||||||
TopicPartitionInfo tpi = topicService.getEdgeEventNotificationsTopic(edgeEvent.getTenantId(), edgeEvent.getEdgeId(), serviceIdOpt.get().get());
|
ToEdgeEventNotificationMsg msg = ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build();
|
||||||
TransportProtos.ToEdgeEventNotificationMsg msg = TransportProtos.ToEdgeEventNotificationMsg.newBuilder().setEdgeEventMsg(ProtoUtils.toProto(edgeEvent)).build();
|
|
||||||
producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
|
producerProvider.getTbEdgeEventsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
|
||||||
|
|
||||||
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(edgeEvent.getTenantId()).entity(edgeEvent).entityId(edgeEvent.getEdgeId()).build());
|
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(edgeEvent.getTenantId()).entity(edgeEvent).entityId(edgeEvent.getEdgeId()).build());
|
||||||
@ -85,8 +81,6 @@ public class KafkaEdgeEventService implements EdgeEventService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cleanupEvents(long ttl) {
|
public void cleanupEvents(long ttl) {}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,11 +15,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc;
|
package org.thingsboard.server.service.edge.rpc;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import jakarta.annotation.PreDestroy;
|
import jakarta.annotation.PreDestroy;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||||
import org.thingsboard.server.common.data.id.EdgeId;
|
import org.thingsboard.server.common.data.id.EdgeId;
|
||||||
@ -83,8 +85,21 @@ public class KafkaEdgeGrpcSession extends AbstractEdgeGrpcSession<KafkaEdgeGrpcS
|
|||||||
}
|
}
|
||||||
|
|
||||||
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
|
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
|
||||||
sendDownlinkMsgsPack(downlinkMsgsPack).get();
|
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(@Nullable Boolean isInterrupted) {
|
||||||
|
if (Boolean.TRUE.equals(isInterrupted)) {
|
||||||
|
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", tenantId, edge.getId(), sessionId);
|
||||||
|
} else {
|
||||||
edgeEventsConsumer.commit();
|
edgeEventsConsumer.commit();
|
||||||
|
processEdgeEvents();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
log.error("[{}] Failed to send downlink msgs pack", sessionId, t);
|
||||||
|
}
|
||||||
|
}, ctx.getGrpcCallbackExecutorService());
|
||||||
return Futures.immediateFuture(Boolean.TRUE);
|
return Futures.immediateFuture(Boolean.TRUE);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("[{}][{}] Error occurred while polling edge events from Kafka: {}", tenantId, edge.getId(), e.getMessage());
|
log.error("[{}][{}] Error occurred while polling edge events from Kafka: {}", tenantId, edge.getId(), e.getMessage());
|
||||||
|
|||||||
@ -46,6 +46,15 @@ import java.util.function.BiConsumer;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession<PostgresEdgeGrpcSession> {
|
public class PostgresEdgeGrpcSession extends AbstractEdgeGrpcSession<PostgresEdgeGrpcSession> {
|
||||||
|
|
||||||
|
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
||||||
|
private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
|
||||||
|
|
||||||
|
private Long newStartTs;
|
||||||
|
private Long previousStartTs;
|
||||||
|
private Long newStartSeqId;
|
||||||
|
private Long previousStartSeqId;
|
||||||
|
private Long seqIdEnd;
|
||||||
|
|
||||||
PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
|
PostgresEdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream,
|
||||||
BiConsumer<EdgeId, PostgresEdgeGrpcSession> sessionOpenListener,
|
BiConsumer<EdgeId, PostgresEdgeGrpcSession> sessionOpenListener,
|
||||||
BiConsumer<Edge, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService,
|
BiConsumer<Edge, UUID> sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService,
|
||||||
|
|||||||
@ -20,9 +20,7 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.context.annotation.Lazy;
|
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.cache.TbTransactionalCache;
|
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
import org.thingsboard.server.common.data.EdgeUtils;
|
import org.thingsboard.server.common.data.EdgeUtils;
|
||||||
import org.thingsboard.server.common.data.EntityType;
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
@ -57,9 +55,6 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.TbQueueCallback;
|
import org.thingsboard.server.queue.TbQueueCallback;
|
||||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
|
||||||
import org.thingsboard.server.queue.discovery.TopicService;
|
|
||||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
|
||||||
import org.thingsboard.server.service.edge.EdgeContextComponent;
|
import org.thingsboard.server.service.edge.EdgeContextComponent;
|
||||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
||||||
@ -88,9 +83,6 @@ public abstract class BaseEdgeProcessor {
|
|||||||
@Autowired
|
@Autowired
|
||||||
protected DbCallbackExecutorService dbCallbackExecutorService;
|
protected DbCallbackExecutorService dbCallbackExecutorService;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private TbTransactionalCache<EdgeId, String> edgeIdServiceIdCache;
|
|
||||||
|
|
||||||
protected ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
|
protected ListenableFuture<Void> saveEdgeEvent(TenantId tenantId,
|
||||||
EdgeId edgeId,
|
EdgeId edgeId,
|
||||||
EdgeEventType type,
|
EdgeEventType type,
|
||||||
|
|||||||
@ -28,7 +28,8 @@ public abstract class AbstractCleanUpService {
|
|||||||
|
|
||||||
private final PartitionService partitionService;
|
private final PartitionService partitionService;
|
||||||
|
|
||||||
protected boolean isSystemTenantPartitionMine(){
|
protected boolean isSystemTenantPartitionMine() {
|
||||||
return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
|
return partitionService.resolve(ServiceType.TB_CORE, TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID).isMyPartition();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,7 +32,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_N
|
|||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
@ConditionalOnExpression("${sql.ttl.edge_events.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0")
|
@ConditionalOnExpression("${edges.enabled:true} && ${sql.ttl.edge_events.edge_events_ttl:0} > 0")
|
||||||
public class EdgeEventsCleanUpService extends AbstractCleanUpService {
|
public class EdgeEventsCleanUpService extends AbstractCleanUpService {
|
||||||
|
|
||||||
public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION =
|
public static final String RANDOM_DELAY_INTERVAL_MS_EXPRESSION =
|
||||||
|
|||||||
@ -0,0 +1,111 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2024 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.ttl;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||||
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||||
|
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
|
import org.thingsboard.server.dao.edge.EdgeService;
|
||||||
|
import org.thingsboard.server.dao.tenant.TenantService;
|
||||||
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
|
import org.thingsboard.server.queue.discovery.TopicService;
|
||||||
|
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
|
||||||
|
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
|
||||||
|
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
|
||||||
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
|
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@TbCoreComponent
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@ConditionalOnExpression("'${queue.type:null}'=='kafka' && ${edges.enabled:true}")
|
||||||
|
public class KafkaEdgeTopicsCleanUpService {
|
||||||
|
|
||||||
|
private static final long ONE_MONTH_MILLIS = TimeUnit.DAYS.toChronoUnit().getDuration().multipliedBy(30).toMillis();
|
||||||
|
|
||||||
|
private final EdgeService edgeService;
|
||||||
|
private final TenantService tenantService;
|
||||||
|
private final AttributesService attributesService;
|
||||||
|
|
||||||
|
private final TopicService topicService;
|
||||||
|
private final PartitionService partitionService;
|
||||||
|
|
||||||
|
private final TbKafkaSettings kafkaSettings;
|
||||||
|
private final TbKafkaTopicConfigs kafkaTopicConfigs;
|
||||||
|
|
||||||
|
private final ExecutorService executorService = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("kafka-edge-topic-cleanup"));
|
||||||
|
|
||||||
|
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.checking_interval})}", fixedDelayString = "${sql.ttl.edge_events.checking_interval}")
|
||||||
|
public void cleanUp() {
|
||||||
|
executorService.submit(() -> {
|
||||||
|
PageDataIterable<TenantId> tenants = new PageDataIterable<>(tenantService::findTenantsIds, 10_000);
|
||||||
|
for (TenantId tenantId : tenants) {
|
||||||
|
try {
|
||||||
|
cleanUp(tenantId);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to drop kafka topics for tenant {}", tenantId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanUp(TenantId tenantId) throws Exception {
|
||||||
|
if (!partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
PageDataIterable<Edge> edges = new PageDataIterable<>(link -> edgeService.findEdgesByTenantId(tenantId, link), 1024);
|
||||||
|
long currentTimeMillis = System.currentTimeMillis();
|
||||||
|
|
||||||
|
for (Edge edge : edges) {
|
||||||
|
Optional<AttributeKvEntry> attributeOpt = attributesService.find(tenantId, edge.getId(), AttributeScope.SERVER_SCOPE, DefaultDeviceStateService.LAST_CONNECT_TIME).get();
|
||||||
|
if (attributeOpt.isPresent()) {
|
||||||
|
Optional<Long> lastConnectTimeOpt = attributeOpt.get().getLongValue();
|
||||||
|
if (lastConnectTimeOpt.isPresent() && isTopicExpired(lastConnectTimeOpt.get(), currentTimeMillis)) {
|
||||||
|
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic();
|
||||||
|
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
|
||||||
|
if (kafkaAdmin.isTopicEmpty(topic)) {
|
||||||
|
kafkaAdmin.deleteTopic(topic);
|
||||||
|
log.info("Removed outdated topic for tenant {} and edge {} older than {}", tenantId, edge.getName(), Date.from(Instant.ofEpochMilli(currentTimeMillis - ONE_MONTH_MILLIS)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isTopicExpired(long lastConnectTime, long currentTimeMillis) {
|
||||||
|
return lastConnectTime + ONE_MONTH_MILLIS < currentTimeMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -1554,6 +1554,9 @@ queue:
|
|||||||
housekeeper-reprocessing: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_TOPIC_PROPERTIES:retention.ms:7776000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
housekeeper-reprocessing: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_TOPIC_PROPERTIES:retention.ms:7776000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||||
# Kafka properties for Edge topic
|
# Kafka properties for Edge topic
|
||||||
edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
edge: "${TB_QUEUE_KAFKA_EDGE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||||
|
# Kafka properties for Edge event topic
|
||||||
|
# TODO : discuss and adjust properties
|
||||||
|
edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
|
||||||
consumer-stats:
|
consumer-stats:
|
||||||
# Prints lag between consumer group offset and last messages offset in Kafka topics
|
# Prints lag between consumer group offset and last messages offset in Kafka topics
|
||||||
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
|
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
|
||||||
|
|||||||
@ -636,10 +636,6 @@ message ProcessingStrategyProto {
|
|||||||
int64 maxPauseBetweenRetries = 5;
|
int64 maxPauseBetweenRetries = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message EdgeEvent {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main Messages;
|
* Main Messages;
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -61,12 +61,12 @@ public class TopicService {
|
|||||||
return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false);
|
return buildTopicPartitionInfo("tb_edge.notifications." + serviceId, null, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TopicPartitionInfo getEdgeEventNotificationsTopic(TenantId tenantId, EdgeId edgeId) {
|
||||||
return tbEdgeEventsNotificationTopics.computeIfAbsent(edgeId, id -> buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId, serviceId));
|
return tbEdgeEventsNotificationTopics.computeIfAbsent(edgeId, id -> buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TopicPartitionInfo buildEdgeEventNotificationsTopicPartitionInfo(TenantId tenantId, EdgeId edgeId) {
|
||||||
return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId, null, null, false);
|
return buildTopicPartitionInfo("tb_edge_event.notifications." + tenantId + "." + edgeId, null, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) {
|
private TopicPartitionInfo buildNotificationsTopicPartitionInfo(ServiceType serviceType, String serviceId) {
|
||||||
|
|||||||
@ -170,7 +170,16 @@ public class TbKafkaAdmin implements TbQueueAdmin {
|
|||||||
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
|
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTopicEmpty(String topic) {
|
||||||
|
try {
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> offsets = settings.getAdminClient().listConsumerGroupOffsets("__consumer_offsets").partitionsToOffsetAndMetadata().get();
|
||||||
|
return offsets.entrySet().stream().noneMatch(entry -> entry.getKey().topic().equals(topic));
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
log.error("Failed to check if topic [{}] is empty.", topic, e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,6 +46,8 @@ public class TbKafkaTopicConfigs {
|
|||||||
private String vcProperties;
|
private String vcProperties;
|
||||||
@Value("${queue.kafka.topic-properties.edge:}")
|
@Value("${queue.kafka.topic-properties.edge:}")
|
||||||
private String edgeProperties;
|
private String edgeProperties;
|
||||||
|
@Value("${queue.kafka.topic-properties.edge-event:}")
|
||||||
|
private String edgeEventProperties;
|
||||||
@Value("${queue.kafka.topic-properties.housekeeper:}")
|
@Value("${queue.kafka.topic-properties.housekeeper:}")
|
||||||
private String housekeeperProperties;
|
private String housekeeperProperties;
|
||||||
@Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}")
|
@Value("${queue.kafka.topic-properties.housekeeper-reprocessing:}")
|
||||||
@ -75,6 +77,8 @@ public class TbKafkaTopicConfigs {
|
|||||||
private Map<String, String> housekeeperReprocessingConfigs;
|
private Map<String, String> housekeeperReprocessingConfigs;
|
||||||
@Getter
|
@Getter
|
||||||
private Map<String, String> edgeConfigs;
|
private Map<String, String> edgeConfigs;
|
||||||
|
@Getter
|
||||||
|
private Map<String, String> edgeEventConfigs;
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
private void init() {
|
private void init() {
|
||||||
@ -92,6 +96,7 @@ public class TbKafkaTopicConfigs {
|
|||||||
housekeeperConfigs = PropertyUtils.getProps(housekeeperProperties);
|
housekeeperConfigs = PropertyUtils.getProps(housekeeperProperties);
|
||||||
housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties);
|
housekeeperReprocessingConfigs = PropertyUtils.getProps(housekeeperReprocessingProperties);
|
||||||
edgeConfigs = PropertyUtils.getProps(edgeProperties);
|
edgeConfigs = PropertyUtils.getProps(edgeProperties);
|
||||||
|
edgeEventConfigs = PropertyUtils.getProps(edgeEventProperties);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -285,7 +285,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -265,7 +265,7 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -207,7 +207,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -93,6 +93,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
private final TbQueueAdmin housekeeperAdmin;
|
private final TbQueueAdmin housekeeperAdmin;
|
||||||
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
||||||
private final TbQueueAdmin edgeAdmin;
|
private final TbQueueAdmin edgeAdmin;
|
||||||
|
private final TbQueueAdmin edgeEventAdmin;
|
||||||
|
|
||||||
private final AtomicLong consumerCount = new AtomicLong();
|
private final AtomicLong consumerCount = new AtomicLong();
|
||||||
|
|
||||||
@ -131,6 +132,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
||||||
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
|
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
|
||||||
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
|
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -466,14 +468,14 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId));
|
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
|
||||||
consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("monolith-to-edge-event-consumer" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
|
consumerBuilder.groupId(topicService.buildTopicName("monolith-edge-event-consumer"));
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(edgeAdmin);
|
consumerBuilder.admin(edgeEventAdmin);
|
||||||
consumerBuilder.statsService(consumerStatsService);
|
consumerBuilder.statsService(consumerStatsService);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
}
|
}
|
||||||
@ -484,7 +486,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
|
|||||||
requestBuilder.settings(kafkaSettings);
|
requestBuilder.settings(kafkaSettings);
|
||||||
requestBuilder.clientId("monolith-to-edge-event-" + serviceInfoProvider.getServiceId());
|
requestBuilder.clientId("monolith-to-edge-event-" + serviceInfoProvider.getServiceId());
|
||||||
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
||||||
requestBuilder.admin(edgeAdmin);
|
requestBuilder.admin(edgeEventAdmin);
|
||||||
return requestBuilder.build();
|
return requestBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -92,6 +92,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
private final TbQueueAdmin housekeeperAdmin;
|
private final TbQueueAdmin housekeeperAdmin;
|
||||||
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
private final TbQueueAdmin housekeeperReprocessingAdmin;
|
||||||
private final TbQueueAdmin edgeAdmin;
|
private final TbQueueAdmin edgeAdmin;
|
||||||
|
private final TbQueueAdmin edgeEventAdmin;
|
||||||
|
|
||||||
private final AtomicLong consumerCount = new AtomicLong();
|
private final AtomicLong consumerCount = new AtomicLong();
|
||||||
|
|
||||||
@ -131,6 +132,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
||||||
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
|
this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
|
||||||
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
|
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -415,14 +417,14 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
|
||||||
consumerBuilder.settings(kafkaSettings);
|
consumerBuilder.settings(kafkaSettings);
|
||||||
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId + "." + serviceId));
|
consumerBuilder.topic(topicService.buildTopicName("tb_edge_event.notifications." + tenantId + "." + edgeId));
|
||||||
consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId());
|
consumerBuilder.clientId("tb-core-edge-event-consumer" + serviceInfoProvider.getServiceId());
|
||||||
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
|
consumerBuilder.groupId(topicService.buildTopicName("tb-core-edge-event-consumer"));
|
||||||
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToEdgeEventNotificationMsg.parseFrom(msg.getData()), msg.getHeaders()));
|
||||||
consumerBuilder.admin(edgeAdmin);
|
consumerBuilder.admin(edgeEventAdmin);
|
||||||
consumerBuilder.statsService(consumerStatsService);
|
consumerBuilder.statsService(consumerStatsService);
|
||||||
return consumerBuilder.build();
|
return consumerBuilder.build();
|
||||||
}
|
}
|
||||||
@ -433,7 +435,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
requestBuilder.settings(kafkaSettings);
|
requestBuilder.settings(kafkaSettings);
|
||||||
requestBuilder.clientId("tb-core-edge-event-" + serviceInfoProvider.getServiceId());
|
requestBuilder.clientId("tb-core-edge-event-" + serviceInfoProvider.getServiceId());
|
||||||
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
||||||
requestBuilder.admin(edgeAdmin);
|
requestBuilder.admin(edgeEventAdmin);
|
||||||
return requestBuilder.build();
|
return requestBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -80,6 +80,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
private final TbQueueAdmin fwUpdatesAdmin;
|
private final TbQueueAdmin fwUpdatesAdmin;
|
||||||
private final TbQueueAdmin housekeeperAdmin;
|
private final TbQueueAdmin housekeeperAdmin;
|
||||||
private final TbQueueAdmin edgeAdmin;
|
private final TbQueueAdmin edgeAdmin;
|
||||||
|
private final TbQueueAdmin edgeEventAdmin;
|
||||||
private final AtomicLong consumerCount = new AtomicLong();
|
private final AtomicLong consumerCount = new AtomicLong();
|
||||||
|
|
||||||
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
|
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
|
||||||
@ -109,6 +110,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
|
this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
|
||||||
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
|
||||||
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
|
this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -197,7 +199,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
|
|||||||
requestBuilder.settings(kafkaSettings);
|
requestBuilder.settings(kafkaSettings);
|
||||||
requestBuilder.clientId("tb-rule-engine-edge-event-" + serviceInfoProvider.getServiceId());
|
requestBuilder.clientId("tb-rule-engine-edge-event-" + serviceInfoProvider.getServiceId());
|
||||||
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
requestBuilder.defaultTopic(topicService.buildTopicName("edge-events"));
|
||||||
requestBuilder.admin(edgeAdmin);
|
requestBuilder.admin(edgeEventAdmin);
|
||||||
return requestBuilder.build();
|
return requestBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -286,7 +286,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -258,7 +258,7 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -284,7 +284,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -174,7 +174,7 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -283,7 +283,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -258,7 +258,7 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId) {
|
public TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -151,7 +151,7 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory, Hous
|
|||||||
|
|
||||||
TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> createEdgeNotificationsMsgProducer();
|
TbQueueProducer<TbProtoQueueMsg<ToEdgeNotificationMsg>> createEdgeNotificationsMsgProducer();
|
||||||
|
|
||||||
TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId, String serviceId);
|
TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId);
|
||||||
|
|
||||||
TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgProducer();
|
TbQueueProducer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> createEdgeEventMsgProducer();
|
||||||
|
|
||||||
|
|||||||
@ -251,6 +251,7 @@ public class EdgeServiceImpl extends AbstractCachedEntityService<EdgeCacheKey, E
|
|||||||
}
|
}
|
||||||
edgeDao.removeById(tenantId, edgeId.getId());
|
edgeDao.removeById(tenantId, edgeId.getId());
|
||||||
|
|
||||||
|
|
||||||
publishEvictEvent(new EdgeCacheEvictEvent(edge.getTenantId(), edge.getName(), null));
|
publishEvictEvent(new EdgeCacheEvictEvent(edge.getTenantId(), edge.getName(), null));
|
||||||
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(edgeId).build());
|
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(edgeId).build());
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user