Polishing after review

This commit is contained in:
Dmytro Skarzhynets 2023-12-09 10:05:06 +02:00 committed by Dmytro Skarzhynets
parent 8b3ad49ed8
commit 6d15bf82df
6 changed files with 40 additions and 49 deletions

View File

@ -115,31 +115,33 @@ public class FirstOnlyIntegrationActivityManager extends AbstractActivityManager
@Override @Override
protected void doOnReportingPeriodEnd() { protected void doOnReportingPeriodEnd() {
for (Map.Entry<IntegrationActivityKey, ActivityStateWrapper> entry : states.entrySet()) { for (Map.Entry<IntegrationActivityKey, ActivityStateWrapper> entry : states.entrySet()) {
var activityKey = entry.getKey();
var activityStateWrapper = entry.getValue(); var activityStateWrapper = entry.getValue();
if (activityStateWrapper.isAlreadyBeenReported()) {
activityStateWrapper.setAlreadyBeenReported(false);
continue;
}
var activityKey = entry.getKey();
var activityState = activityStateWrapper.getState(); var activityState = activityStateWrapper.getState();
long lastRecordedTime = activityState.getLastRecordedTime(); long lastRecordedTime = activityState.getLastRecordedTime();
// if there were no activities during the reporting period, we should remove the entry to prevent memory leaks // if there were no activities during the reporting period, we should remove the entry to prevent memory leaks
if (!activityStateWrapper.isAlreadyBeenReported()) { log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.",
log.debug("[{}][{}] No activity events were received during reporting period for device with id: [{}]. Going to remove activity state.", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId());
activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); states.remove(activityKey);
states.remove(activityKey); // report leftover events
// report leftover events if (activityState.getLastReportedTime() < lastRecordedTime) {
if (activityState.getLastReportedTime() < lastRecordedTime) { log.debug("[{}][{}] Going to report leftover activity event for device with id: [{}].", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId());
log.debug("[{}][{}] Going to report leftover activity event for device with id: [{}].", activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); reporter.report(activityKey, lastRecordedTime, activityState, new ActivityReportCallback<>() {
reporter.report(activityKey, lastRecordedTime, activityState, new ActivityReportCallback<>() { @Override
@Override public void onSuccess(IntegrationActivityKey key, long reportedTime) {
public void onSuccess(IntegrationActivityKey key, long reportedTime) { updateLastReportedTime(key, reportedTime); // just in case the same key was added again
updateLastReportedTime(key, reportedTime); // just in case the same key was added again }
}
@Override @Override
public void onFailure(IntegrationActivityKey key, Throwable t) { public void onFailure(IntegrationActivityKey key, Throwable t) {
log.debug("[{}][{}] Failed to report last activity event in a period for device with id: [{}].", log.debug("[{}][{}] Failed to report last activity event in a period for device with id: [{}].",
activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId()); activityKey.getTenantId().getId(), name, activityKey.getDeviceId().getId());
} }
}); });
}
} }
activityStateWrapper.setAlreadyBeenReported(false); activityStateWrapper.setAlreadyBeenReported(false);
} }

View File

@ -785,7 +785,9 @@ state:
# If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;' # If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;'
# If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;' # If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;'
persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
# Time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime'). Used only when state.persistToTelemetry is set to 'true'. # Millisecond value defining time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime').
# Used only when state.persistToTelemetry is set to 'true' and Cassandra is used for timeseries data.
# 0 means time-to-live mechanism is disabled.
telemetryTtl: "${STATE_TELEMETRY_TTL:0}" telemetryTtl: "${STATE_TELEMETRY_TTL:0}"
# Tbel parameters # Tbel parameters
@ -870,11 +872,10 @@ transport:
report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}"
activity: activity:
# This property specifies the strategy for reporting activity events within each reporting period. # This property specifies the strategy for reporting activity events within each reporting period.
# The accepted values are 'first', 'last', and 'first and last'. # The accepted values are 'first', 'last', and 'first-and-last'.
# - 'first': Only the first activity event in each reporting period is reported. # - 'first': Only the first activity event in each reporting period is reported.
# - 'last': Only the last activity event in the reporting period is reported. # - 'last': Only the last activity event in the reporting period is reported.
# - 'first-and-last': Both the first and last activity events in the reporting period are reported. # - 'first-and-last': Both the first and last activity events in the reporting period are reported.
# - 'all': All activity events in the reporting period are reported.
reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:last}" reporting_strategy: "${TB_TRANSPORT_ACTIVITY_REPORTING_STRATEGY:last}"
json: json:
# Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON # Cast String data types to Numeric if possible when processing Telemetry/Attributes JSON

View File

@ -136,6 +136,7 @@ public class DefaultTransportService implements TransportService {
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!"; public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!";
private static final String ACTIVITY_MANAGER_NAME = "transport-activity-manager";
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN); public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN);
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED); public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED);
public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder() public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder()
@ -187,7 +188,6 @@ public class DefaultTransportService implements TransportService {
private final NotificationRuleProcessor notificationRuleProcessor; private final NotificationRuleProcessor notificationRuleProcessor;
private final EntityLimitsCache entityLimitsCache; private final EntityLimitsCache entityLimitsCache;
private ActivityManager<UUID, TransportActivityState> activityManager; private ActivityManager<UUID, TransportActivityState> activityManager;
private static final String ACTIVITY_MANAGER_NAME = "transport-activity-manager";
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate; protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
@ -260,7 +260,7 @@ public class DefaultTransportService implements TransportService {
} }
private final ActivityStateReporter<UUID, TransportActivityState> activityReporter = (sessionId, timeToReport, state, reportCallback) -> { private final ActivityStateReporter<UUID, TransportActivityState> activityReporter = (sessionId, timeToReport, state, reportCallback) -> {
log.debug("[{}] Reporting activity state for key: [{}]. Time to report: [{}].", ACTIVITY_MANAGER_NAME, sessionId, timeToReport); log.debug("[{}] Reporting activity state for session with id: [{}]. Time to report: [{}].", ACTIVITY_MANAGER_NAME, sessionId, timeToReport);
SessionMetaData sessionMetaData = sessions.get(sessionId); SessionMetaData sessionMetaData = sessions.get(sessionId);
TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder() TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToAttributes()) .setAttributeSubscription(sessionMetaData != null && sessionMetaData.isSubscribedToAttributes())

View File

@ -49,9 +49,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -129,7 +127,6 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
@Override @Override
protected void doOnReportingPeriodEnd() { protected void doOnReportingPeriodEnd() {
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) { for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) {
var sessionId = entry.getKey(); var sessionId = entry.getKey();
var activityStateWrapper = entry.getValue(); var activityStateWrapper = entry.getValue();
@ -139,8 +136,8 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
if (sessionMetaData != null) { if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo()); activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else { } else {
log.debug("[{}] Session with id: [{}] is not present. Marking it's activity state for removal.", name, sessionId); log.debug("[{}] Session with id: [{}] is not present. Removing it's activity state.", name, sessionId);
statesToRemove.add(sessionId); states.remove(sessionId);
} }
long lastActivityTime = activityState.getLastRecordedTime(); long lastActivityTime = activityState.getLastRecordedTime();
@ -161,8 +158,8 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime; boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) { if (hasExpired) {
log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Marking it's activity state for removal.", name, sessionId, lastActivityTime); log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Removing it's activity state.", name, sessionId, lastActivityTime);
statesToRemove.add(sessionId); states.remove(sessionId);
transportService.deregisterSession(sessionInfo); transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
@ -183,7 +180,6 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
} }
activityStateWrapper.setAlreadyBeenReported(false); activityStateWrapper.setAlreadyBeenReported(false);
} }
statesToRemove.forEach(states::remove);
} }
private void updateLastReportedTime(UUID key, long newLastReportedTime) { private void updateLastReportedTime(UUID key, long newLastReportedTime) {

View File

@ -49,9 +49,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -129,7 +127,6 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
@Override @Override
protected void doOnReportingPeriodEnd() { protected void doOnReportingPeriodEnd() {
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) { for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) {
var sessionId = entry.getKey(); var sessionId = entry.getKey();
var activityStateWrapper = entry.getValue(); var activityStateWrapper = entry.getValue();
@ -139,8 +136,8 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
if (sessionMetaData != null) { if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo()); activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else { } else {
log.debug("[{}] Session with id: [{}] is not present. Marking it's activity state for removal.", name, sessionId); log.debug("[{}] Session with id: [{}] is not present. Removing it's activity state.", name, sessionId);
statesToRemove.add(sessionId); states.remove(sessionId);
} }
long lastActivityTime = activityState.getLastRecordedTime(); long lastActivityTime = activityState.getLastRecordedTime();
@ -161,8 +158,8 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime; boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) { if (hasExpired) {
log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Marking it's activity state for removal.", name, sessionId, lastActivityTime); log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Removing it's activity state.", name, sessionId, lastActivityTime);
statesToRemove.add(sessionId); states.remove(sessionId);
transportService.deregisterSession(sessionInfo); transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
@ -184,7 +181,6 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
} }
activityStateWrapper.setAlreadyBeenReported(false); activityStateWrapper.setAlreadyBeenReported(false);
} }
statesToRemove.forEach(states::remove);
} }
private void updateLastReportedTime(UUID key, long newLastReportedTime) { private void updateLastReportedTime(UUID key, long newLastReportedTime) {

View File

@ -42,9 +42,7 @@ import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -82,7 +80,6 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
@Override @Override
protected void doOnReportingPeriodEnd() { protected void doOnReportingPeriodEnd() {
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, TransportActivityState> entry : states.entrySet()) { for (Map.Entry<UUID, TransportActivityState> entry : states.entrySet()) {
var sessionId = entry.getKey(); var sessionId = entry.getKey();
var activityState = entry.getValue(); var activityState = entry.getValue();
@ -91,8 +88,8 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
if (sessionMetaData != null) { if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo()); activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else { } else {
log.debug("[{}] Session with id: [{}] is not present. Marking it's activity state for removal.", name, sessionId); log.debug("[{}] Session with id: [{}] is not present. Removing it's activity state.", name, sessionId);
statesToRemove.add(sessionId); states.remove(sessionId);
} }
long lastActivityTime = activityState.getLastRecordedTime(); long lastActivityTime = activityState.getLastRecordedTime();
@ -113,8 +110,8 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime; boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) { if (hasExpired) {
log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Marking it's activity state for removal.", name, sessionId, lastActivityTime); log.debug("[{}] Session with id: [{}] has expired due to last activity time: [{}]. Removing it's activity state.", name, sessionId, lastActivityTime);
statesToRemove.add(sessionId); states.remove(sessionId);
transportService.deregisterSession(sessionInfo); transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
@ -134,7 +131,6 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
}); });
} }
} }
statesToRemove.forEach(states::remove);
} }
private void updateLastReportedTime(UUID key, long newLastReportedTime) { private void updateLastReportedTime(UUID key, long newLastReportedTime) {