Fixes after manual testing

This commit is contained in:
Dmytro Skarzhynets 2023-12-15 09:43:49 +02:00
parent 7431233f5d
commit 315202d9df
4 changed files with 8 additions and 39 deletions

View File

@ -63,7 +63,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
import org.thingsboard.server.gen.transport.TransportProtos.ValidateOrCreateDeviceX509CertRequestMsg;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@ -162,7 +161,5 @@ public interface TransportService {
boolean hasSession(SessionInfoProto sessionInfo);
SessionMetaData getSession(UUID sessionId);
void createGaugeStats(String openConnections, AtomicInteger connectionsCounter);
}

View File

@ -31,16 +31,15 @@
package org.thingsboard.server.common.transport.activity;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.activity.strategy.ActivityStrategy;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -48,9 +47,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key> {
private final ConcurrentMap<Key, ActivityState<Metadata>> states = new ConcurrentHashMap<>();
@Autowired
protected SchedulerComponent scheduler;
protected String name;
private long reportingPeriodMillis;
private ScheduledExecutorService scheduler;
private boolean initialized;
@Override
@ -62,8 +63,6 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
reportingPeriodMillis = 3000;
log.error("[{}] Negative or zero reporting period millisecond was provided. Going to use reporting period value of 3 seconds.", this.name);
}
this.reportingPeriodMillis = reportingPeriodMillis;
scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(this.name)); // TODO: use scheduler component
scheduler.scheduleAtFixedRate(this::onReportingPeriodEnd, new Random().nextInt((int) reportingPeriodMillis), reportingPeriodMillis, TimeUnit.MILLISECONDS);
initialized = true;
log.info("Activity manager with name [{}] is initialized.", this.name);
@ -172,7 +171,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
if (shouldReport && lastReportedTime < lastRecordedTime) {
log.debug("[{}] Going to report last activity event for key: [{}].", name, key);
reportActivity(key, metadata, currentState.getLastRecordedTime(), new ActivityReportCallback<>() {
reportActivity(key, metadata, lastRecordedTime, new ActivityReportCallback<>() {
@Override
public void onSuccess(Key key, long reportedTime) {
updateLastReportedTime(key, reportedTime);
@ -194,22 +193,4 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
});
}
@Override
public synchronized void destroy() {
if (initialized) {
initialized = false;
if (scheduler != null) {
scheduler.shutdown();
try {
if (scheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
}

View File

@ -36,6 +36,4 @@ public interface ActivityManager<Key> {
void onActivity(Key key);
void destroy();
}

View File

@ -186,7 +186,6 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
private final TransportRateLimitService rateLimitService;
private final DataDecodingEncodingService dataDecodingEncodingService;
private final SchedulerComponent scheduler;
private final ApplicationEventPublisher eventPublisher;
private final TransportResourceCache transportResourceCache;
private final NotificationRuleProcessor notificationRuleProcessor;
@ -295,7 +294,6 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
@PreDestroy
public void destroy() {
stopped = true;
super.destroy();
if (transportNotificationsConsumer != null) {
transportNotificationsConsumer.unsubscribe();
@ -838,7 +836,7 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
@Override
protected boolean hasExpired(UUID uuid, ActivityState<TransportProtos.SessionInfoProto> state) {
return (System.currentTimeMillis() - sessionInactivityTimeout) < state.getLastReportedTime();
return (System.currentTimeMillis() - sessionInactivityTimeout) > state.getLastRecordedTime();
}
@Override
@ -1351,11 +1349,6 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
return sessions.containsKey(toSessionId(sessionInfo));
}
@Override
public SessionMetaData getSession(UUID sessionId) {
return sessions.get(sessionId);
}
@Override
public void createGaugeStats(String statsName, AtomicInteger number) {
statsFactory.createGauge(StatsType.TRANSPORT + "." + statsName, number);