Merge pull request #5738 from smatvienko-tb/device_actor_do_not_dump_sessions_for_loacl_cache
device actor - do not dump/restore device session for local cache
This commit is contained in:
commit
cf2d8c6888
@ -36,6 +36,7 @@ import org.thingsboard.rule.engine.api.SmsService;
|
|||||||
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
|
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
|
||||||
import org.thingsboard.server.actors.service.ActorService;
|
import org.thingsboard.server.actors.service.ActorService;
|
||||||
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
|
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
|
||||||
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
import org.thingsboard.server.common.data.Event;
|
import org.thingsboard.server.common.data.Event;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
@ -82,7 +83,6 @@ import org.thingsboard.server.service.executors.ExternalCallExecutorService;
|
|||||||
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
|
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
|
||||||
import org.thingsboard.server.service.mail.MailExecutorService;
|
import org.thingsboard.server.service.mail.MailExecutorService;
|
||||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
|
||||||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
||||||
import org.thingsboard.server.service.rpc.TbRpcService;
|
import org.thingsboard.server.service.rpc.TbRpcService;
|
||||||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
||||||
@ -95,6 +95,7 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
|||||||
import org.thingsboard.server.service.transport.TbCoreToTransportService;
|
import org.thingsboard.server.service.transport.TbCoreToTransportService;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
@ -333,30 +334,42 @@ public class ActorSystemContext {
|
|||||||
@Getter
|
@Getter
|
||||||
private long maxConcurrentSessionsPerDevice;
|
private long maxConcurrentSessionsPerDevice;
|
||||||
|
|
||||||
@Value("${actors.session.sync.timeout}")
|
@Value("${actors.session.sync.timeout:10000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long syncSessionTimeout;
|
private long syncSessionTimeout;
|
||||||
|
|
||||||
@Value("${actors.rule.chain.error_persist_frequency}")
|
@Value("${actors.rule.chain.error_persist_frequency:3000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long ruleChainErrorPersistFrequency;
|
private long ruleChainErrorPersistFrequency;
|
||||||
|
|
||||||
@Value("${actors.rule.node.error_persist_frequency}")
|
@Value("${actors.rule.node.error_persist_frequency:3000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long ruleNodeErrorPersistFrequency;
|
private long ruleNodeErrorPersistFrequency;
|
||||||
|
|
||||||
@Value("${actors.statistics.enabled}")
|
@Value("${actors.statistics.enabled:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean statisticsEnabled;
|
private boolean statisticsEnabled;
|
||||||
|
|
||||||
@Value("${actors.statistics.persist_frequency}")
|
@Value("${actors.statistics.persist_frequency:3600000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long statisticsPersistFrequency;
|
private long statisticsPersistFrequency;
|
||||||
|
|
||||||
@Value("${edges.enabled}")
|
@Value("${edges.enabled:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean edgesEnabled;
|
private boolean edgesEnabled;
|
||||||
|
|
||||||
|
@Value("${cache.type:caffeine}")
|
||||||
|
@Getter
|
||||||
|
private String cacheType;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private boolean localCacheType;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
this.localCacheType = "caffeine".equals(cacheType);
|
||||||
|
}
|
||||||
|
|
||||||
@Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
|
@Scheduled(fixedDelayString = "${actors.statistics.js_print_interval_ms}")
|
||||||
public void printStats() {
|
public void printStats() {
|
||||||
if (statisticsEnabled) {
|
if (statisticsEnabled) {
|
||||||
@ -368,31 +381,31 @@ public class ActorSystemContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Value("${actors.tenant.create_components_on_init}")
|
@Value("${actors.tenant.create_components_on_init:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean tenantComponentsInitEnabled;
|
private boolean tenantComponentsInitEnabled;
|
||||||
|
|
||||||
@Value("${actors.rule.allow_system_mail_service}")
|
@Value("${actors.rule.allow_system_mail_service:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean allowSystemMailService;
|
private boolean allowSystemMailService;
|
||||||
|
|
||||||
@Value("${actors.rule.allow_system_sms_service}")
|
@Value("${actors.rule.allow_system_sms_service:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean allowSystemSmsService;
|
private boolean allowSystemSmsService;
|
||||||
|
|
||||||
@Value("${transport.sessions.inactivity_timeout}")
|
@Value("${transport.sessions.inactivity_timeout:300000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long sessionInactivityTimeout;
|
private long sessionInactivityTimeout;
|
||||||
|
|
||||||
@Value("${transport.sessions.report_timeout}")
|
@Value("${transport.sessions.report_timeout:3000}")
|
||||||
@Getter
|
@Getter
|
||||||
private long sessionReportTimeout;
|
private long sessionReportTimeout;
|
||||||
|
|
||||||
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled}")
|
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled:true}")
|
||||||
@Getter
|
@Getter
|
||||||
private boolean debugPerTenantEnabled;
|
private boolean debugPerTenantEnabled;
|
||||||
|
|
||||||
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration}")
|
@Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration:50000:3600}")
|
||||||
@Getter
|
@Getter
|
||||||
private String debugPerTenantLimitsConfiguration;
|
private String debugPerTenantLimitsConfiguration;
|
||||||
|
|
||||||
|
|||||||
@ -871,6 +871,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void restoreSessions() {
|
void restoreSessions() {
|
||||||
|
if (systemContext.isLocalCacheType()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
log.debug("[{}] Restoring sessions from cache", deviceId);
|
log.debug("[{}] Restoring sessions from cache", deviceId);
|
||||||
DeviceSessionsCacheEntry sessionsDump = null;
|
DeviceSessionsCacheEntry sessionsDump = null;
|
||||||
try {
|
try {
|
||||||
@ -905,6 +908,9 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void dumpSessions() {
|
private void dumpSessions() {
|
||||||
|
if (systemContext.isLocalCacheType()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
|
log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
|
||||||
List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
|
List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
|
||||||
sessions.forEach((uuid, sessionMD) -> {
|
sessions.forEach((uuid, sessionMD) -> {
|
||||||
|
|||||||
@ -0,0 +1,257 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2021 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.actors;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
|
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
|
import org.springframework.test.context.TestPropertySource;
|
||||||
|
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||||
|
import org.thingsboard.rule.engine.api.MailService;
|
||||||
|
import org.thingsboard.rule.engine.api.SmsService;
|
||||||
|
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
|
||||||
|
import org.thingsboard.server.actors.service.ActorService;
|
||||||
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
|
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||||
|
import org.thingsboard.server.dao.asset.AssetService;
|
||||||
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
|
import org.thingsboard.server.dao.audit.AuditLogService;
|
||||||
|
import org.thingsboard.server.dao.cassandra.CassandraCluster;
|
||||||
|
import org.thingsboard.server.dao.customer.CustomerService;
|
||||||
|
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||||
|
import org.thingsboard.server.dao.device.ClaimDevicesService;
|
||||||
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
|
import org.thingsboard.server.dao.edge.EdgeEventService;
|
||||||
|
import org.thingsboard.server.dao.edge.EdgeService;
|
||||||
|
import org.thingsboard.server.dao.entityview.EntityViewService;
|
||||||
|
import org.thingsboard.server.dao.event.EventService;
|
||||||
|
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
|
||||||
|
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
|
||||||
|
import org.thingsboard.server.dao.ota.OtaPackageService;
|
||||||
|
import org.thingsboard.server.dao.relation.RelationService;
|
||||||
|
import org.thingsboard.server.dao.resource.ResourceService;
|
||||||
|
import org.thingsboard.server.dao.rule.RuleChainService;
|
||||||
|
import org.thingsboard.server.dao.rule.RuleNodeStateService;
|
||||||
|
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||||
|
import org.thingsboard.server.dao.tenant.TenantProfileService;
|
||||||
|
import org.thingsboard.server.dao.tenant.TenantService;
|
||||||
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
|
import org.thingsboard.server.dao.user.UserService;
|
||||||
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
|
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
|
||||||
|
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
|
||||||
|
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
||||||
|
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
|
||||||
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
|
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
|
||||||
|
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
|
||||||
|
import org.thingsboard.server.service.mail.MailExecutorService;
|
||||||
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||||
|
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
||||||
|
import org.thingsboard.server.service.rpc.TbRpcService;
|
||||||
|
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
||||||
|
import org.thingsboard.server.service.script.JsInvokeService;
|
||||||
|
import org.thingsboard.server.service.session.DeviceSessionCacheService;
|
||||||
|
import org.thingsboard.server.service.sms.SmsExecutorService;
|
||||||
|
import org.thingsboard.server.service.state.DeviceStateService;
|
||||||
|
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
|
||||||
|
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||||
|
import org.thingsboard.server.service.transport.TbCoreToTransportService;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
@ExtendWith(SpringExtension.class)
|
||||||
|
@ContextConfiguration(classes = ActorSystemContext.class)
|
||||||
|
@EnableConfigurationProperties
|
||||||
|
@TestPropertySource(properties = {
|
||||||
|
"cache.type=caffeine",
|
||||||
|
})
|
||||||
|
public class ActorSystemContextTest {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
ActorSystemContext ctx;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbApiUsageStateService apiUsageStateService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbApiUsageClient apiUsageClient;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbServiceInfoProvider serviceInfoProvider;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private ActorService actorService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private ComponentDiscoveryService componentService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DataDecodingEncodingService encodingService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DeviceService deviceService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbTenantProfileCache tenantProfileCache;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbDeviceProfileCache deviceProfileCache;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private AssetService assetService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DashboardService dashboardService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TenantService tenantService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TenantProfileService tenantProfileService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private CustomerService customerService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private UserService userService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RuleChainService ruleChainService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RuleNodeStateService ruleNodeStateService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private PartitionService partitionService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbClusterService clusterService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TimeseriesService tsService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private AttributesService attributesService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private EventService eventService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RelationService relationService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private AuditLogService auditLogService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private EntityViewService entityViewService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TelemetrySubscriptionService tsSubService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private AlarmSubscriptionService alarmService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private JsInvokeService jsSandbox;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private MailExecutorService mailExecutor;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private SmsExecutorService smsExecutor;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DbCallbackExecutorService dbCallbackExecutor;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private ExternalCallExecutorService externalCallExecutorService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private SharedEventLoopGroupService sharedEventLoopGroupService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private MailService mailService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private SmsService smsService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private SmsSenderFactory smsSenderFactory;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private ClaimDevicesService claimDevicesService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private JsInvokeStats jsInvokeStats;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DeviceStateService deviceStateService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private DeviceSessionCacheService deviceSessionCacheService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbCoreToTransportService tbCoreToTransportService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbRuleEngineDeviceRpcService tbRuleEngineDeviceRpcService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbCoreDeviceRpcService tbCoreDeviceRpcService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private EdgeService edgeService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private EdgeEventService edgeEventService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private EdgeRpcService edgeRpcService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private ResourceService resourceService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private OtaPackageService otaPackageService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private TbRpcService tbRpcService;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private CassandraCluster cassandraCluster;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private CassandraBufferedRateReadExecutor cassandraBufferedRateReadExecutor;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private CassandraBufferedRateWriteExecutor cassandraBufferedRateWriteExecutor;
|
||||||
|
|
||||||
|
@MockBean
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenCaffeineCache_whenInit_thenIsLocalCacheTrue() {
|
||||||
|
assertThat(ctx.getCacheType()).isEqualTo("caffeine");
|
||||||
|
assertThat(ctx.isLocalCacheType()).as("caffeine is the local cache type").isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user