Rule engine transport stats
This commit is contained in:
		
							parent
							
								
									d40c054ca3
								
							
						
					
					
						commit
						84e391963e
					
				@ -16,7 +16,6 @@
 | 
			
		||||
package org.thingsboard.server.service.transport;
 | 
			
		||||
 | 
			
		||||
import akka.actor.ActorRef;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import io.github.bucket4j.Bandwidth;
 | 
			
		||||
import io.github.bucket4j.BlockingBucket;
 | 
			
		||||
import io.github.bucket4j.Bucket4j;
 | 
			
		||||
@ -30,11 +29,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Value;
 | 
			
		||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 | 
			
		||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
 | 
			
		||||
import org.springframework.context.event.ContextRefreshedEvent;
 | 
			
		||||
import org.springframework.context.event.EventListener;
 | 
			
		||||
import org.springframework.scheduling.annotation.Scheduled;
 | 
			
		||||
import org.springframework.stereotype.Service;
 | 
			
		||||
import org.thingsboard.server.actors.ActorSystemContext;
 | 
			
		||||
import org.thingsboard.server.actors.service.ActorService;
 | 
			
		||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
 | 
			
		||||
@ -44,7 +42,6 @@ import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
 | 
			
		||||
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
 | 
			
		||||
import org.thingsboard.server.kafka.TbKafkaSettings;
 | 
			
		||||
import org.thingsboard.server.kafka.TbNodeIdProvider;
 | 
			
		||||
import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
 | 
			
		||||
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
 | 
			
		||||
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
 | 
			
		||||
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
 | 
			
		||||
@ -83,6 +80,8 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 | 
			
		||||
    private long pollRecordsPerSecond;
 | 
			
		||||
    @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
 | 
			
		||||
    private long pollRecordsPerMinute;
 | 
			
		||||
    @Value("${transport.remote.rule_engine.stats.enabled:false}")
 | 
			
		||||
    private boolean statsEnabled;
 | 
			
		||||
 | 
			
		||||
    @Autowired
 | 
			
		||||
    private TbKafkaSettings kafkaSettings;
 | 
			
		||||
@ -108,6 +107,8 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 | 
			
		||||
 | 
			
		||||
    private volatile boolean stopped = false;
 | 
			
		||||
 | 
			
		||||
    private final RuleEngineStats stats = new RuleEngineStats();
 | 
			
		||||
 | 
			
		||||
    @PostConstruct
 | 
			
		||||
    public void init() {
 | 
			
		||||
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
 | 
			
		||||
@ -176,6 +177,13 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Scheduled(fixedDelayString = "${transport.remote.rule_engine.stats.print_interval_ms}")
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        if (statsEnabled) {
 | 
			
		||||
            stats.printStats();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public void process(String nodeId, DeviceActorToTransportMsg msg) {
 | 
			
		||||
        process(nodeId, msg, null, null);
 | 
			
		||||
@ -191,6 +199,9 @@ public class RemoteRuleEngineTransportService implements RuleEngineTransportServ
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
 | 
			
		||||
        if (statsEnabled) {
 | 
			
		||||
            stats.log(toDeviceActorMsg);
 | 
			
		||||
        }
 | 
			
		||||
        TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
 | 
			
		||||
        Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
 | 
			
		||||
        if (address.isPresent()) {
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,80 @@
 | 
			
		||||
/**
 | 
			
		||||
 * Copyright © 2016-2019 The Thingsboard Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package org.thingsboard.server.service.transport;
 | 
			
		||||
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.thingsboard.server.gen.transport.TransportProtos;
 | 
			
		||||
 | 
			
		||||
import java.util.concurrent.atomic.AtomicInteger;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
public class RuleEngineStats {
 | 
			
		||||
 | 
			
		||||
    private final AtomicInteger totalCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger sessionEventCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger postTelemetryCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger postAttributesCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger getAttributesCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger subscribeToAttributesCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger subscribeToRPCCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger toDeviceRPCCallResponseCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger toServerRPCCallRequestCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger subscriptionInfoCounter = new AtomicInteger(0);
 | 
			
		||||
    private final AtomicInteger claimDeviceCounter = new AtomicInteger(0);
 | 
			
		||||
 | 
			
		||||
    public void log(TransportProtos.TransportToDeviceActorMsg msg) {
 | 
			
		||||
        totalCounter.incrementAndGet();
 | 
			
		||||
        if (msg.hasSessionEvent()) {
 | 
			
		||||
            sessionEventCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasPostTelemetry()) {
 | 
			
		||||
            postTelemetryCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasPostAttributes()) {
 | 
			
		||||
            postAttributesCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasGetAttributes()) {
 | 
			
		||||
            getAttributesCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasSubscribeToAttributes()) {
 | 
			
		||||
            subscribeToAttributesCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasSubscribeToRPC()) {
 | 
			
		||||
            subscribeToRPCCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasToDeviceRPCCallResponse()) {
 | 
			
		||||
            toDeviceRPCCallResponseCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasToServerRPCCallRequest()) {
 | 
			
		||||
            toServerRPCCallRequestCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasSubscriptionInfo()) {
 | 
			
		||||
            subscriptionInfoCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
        if (msg.hasClaimDevice()) {
 | 
			
		||||
            claimDeviceCounter.incrementAndGet();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void printStats() {
 | 
			
		||||
        log.info("Transport total [{}] sessionEvents [{}] telemetry [{}] attributes [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] " +
 | 
			
		||||
                        "toServerRpc [{}] subInfo [{}] claimDevice [{}] ",
 | 
			
		||||
                totalCounter.getAndSet(0), sessionEventCounter.getAndSet(0), postTelemetryCounter.getAndSet(0),
 | 
			
		||||
                postAttributesCounter.getAndSet(0), getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0),
 | 
			
		||||
                subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0),
 | 
			
		||||
                toServerRPCCallRequestCounter.getAndSet(0), subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0));
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -479,6 +479,9 @@ transport:
 | 
			
		||||
      poll_records_pack_size: "${TB_RULE_ENGINE_MAX_POLL_RECORDS:1000}"
 | 
			
		||||
      max_poll_records_per_second: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_SECOND:10000}"
 | 
			
		||||
      max_poll_records_per_minute: "${TB_RULE_ENGINE_MAX_POLL_RECORDS_PER_MINUTE:120000}"
 | 
			
		||||
      stats:
 | 
			
		||||
        enabled: "${TB_RULE_ENGINE_STATS_ENABLED:false}"
 | 
			
		||||
        print_interval_ms: "${TB_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
 | 
			
		||||
    notifications:
 | 
			
		||||
      topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
 | 
			
		||||
  sessions:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user