Issue 6056. Refactoring of duplicated code to abstract class
This commit is contained in:
parent
b33d28eedf
commit
878b75aff9
@ -67,6 +67,7 @@ import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
|||||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
|
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
|
||||||
import org.thingsboard.server.service.telemetry.InternalTelemetryService;
|
import org.thingsboard.server.service.telemetry.InternalTelemetryService;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
@ -93,7 +94,7 @@ import java.util.stream.Collectors;
|
|||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@Service
|
@Service
|
||||||
public class DefaultTbApiUsageStateService extends TbApplicationEventListener<PartitionChangeEvent> implements TbApiUsageStateService {
|
public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService<EntityId> implements TbApiUsageStateService {
|
||||||
|
|
||||||
public static final String HOURLY = "Hourly";
|
public static final String HOURLY = "Hourly";
|
||||||
public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
|
public static final FutureCallback<Integer> VOID_CALLBACK = new FutureCallback<Integer>() {
|
||||||
@ -123,8 +124,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
// Entities that should be processed on other servers
|
// Entities that should be processed on other servers
|
||||||
final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>();
|
final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
final ConcurrentMap<TopicPartitionInfo, Set<EntityId>> partitionedEntities = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
||||||
|
|
||||||
@Value("${usage.stats.report.enabled:true}")
|
@Value("${usage.stats.report.enabled:true}")
|
||||||
@ -137,10 +136,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
|
|
||||||
private final ExecutorService mailExecutor;
|
private final ExecutorService mailExecutor;
|
||||||
|
|
||||||
private ListeningScheduledExecutorService scheduledExecutor;
|
|
||||||
|
|
||||||
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
|
|
||||||
|
|
||||||
public DefaultTbApiUsageStateService(TbClusterService clusterService,
|
public DefaultTbApiUsageStateService(TbClusterService clusterService,
|
||||||
PartitionService partitionService,
|
PartitionService partitionService,
|
||||||
TenantService tenantService,
|
TenantService tenantService,
|
||||||
@ -162,7 +157,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("api-usage-scheduled")));
|
super.init();
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
log.info("Starting api usage service.");
|
log.info("Starting api usage service.");
|
||||||
scheduledExecutor.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
|
scheduledExecutor.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
|
||||||
@ -170,6 +165,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getSchedulerExecutorName() {
|
||||||
|
return "api-usage-scheduled";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
|
public void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
|
||||||
ToUsageStatsServiceMsg statsMsg = msg.getValue();
|
ToUsageStatsServiceMsg statsMsg = msg.getValue();
|
||||||
@ -226,59 +226,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
|
||||||
if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
|
|
||||||
subscribeQueue.add(partitionChangeEvent.getPartitions());
|
|
||||||
scheduledExecutor.submit(this::pollInitStateFromDB);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void pollInitStateFromDB() {
|
|
||||||
final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue();
|
|
||||||
if (partitions == null) {
|
|
||||||
log.info("Api Usage state service. Nothing to do. Partitions are empty");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
initStateFromDB(partitions);
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<TopicPartitionInfo> getLatestPartitionsFromQueue() {
|
|
||||||
log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size());
|
|
||||||
Set<TopicPartitionInfo> partitions = null;
|
|
||||||
while (!subscribeQueue.isEmpty()) {
|
|
||||||
partitions = subscribeQueue.poll();
|
|
||||||
log.debug("polled from the queue partitions {}", partitions);
|
|
||||||
}
|
|
||||||
log.debug("getLatestPartitionsFromQueue, partitions {}", partitions);
|
|
||||||
return partitions;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initStateFromDB(Set<TopicPartitionInfo> partitions) {
|
|
||||||
try {
|
|
||||||
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
|
|
||||||
addedPartitions.removeAll(partitionedEntities.keySet());
|
|
||||||
|
|
||||||
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedEntities.keySet());
|
|
||||||
removedPartitions.removeAll(partitions);
|
|
||||||
|
|
||||||
removedPartitions.forEach(partition -> {
|
|
||||||
Set<EntityId> entities = partitionedEntities.remove(partition);
|
|
||||||
entities.forEach(this::cleanUpEntitiesStateMap);
|
|
||||||
});
|
|
||||||
|
|
||||||
addedPartitions.forEach(tpi ->
|
|
||||||
partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
|
|
||||||
|
|
||||||
otherUsageStates.entrySet().removeIf(entry ->
|
|
||||||
partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition());
|
|
||||||
|
|
||||||
initStatesFromDataBase(addedPartitions);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
log.warn("Failed to init tenant states from DB", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ApiUsageState getApiUsageState(TenantId tenantId) {
|
public ApiUsageState getApiUsageState(TenantId tenantId) {
|
||||||
TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId);
|
TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId);
|
||||||
@ -401,7 +348,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
myUsageStates.remove(customerId);
|
myUsageStates.remove(customerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanUpEntitiesStateMap(EntityId entityId) {
|
@Override
|
||||||
|
protected void cleanupEntityOnPartitionRemoval(EntityId entityId) {
|
||||||
myUsageStates.remove(entityId);
|
myUsageStates.remove(entityId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -553,11 +501,14 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initStatesFromDataBase(Set<TopicPartitionInfo> addedPartitions) {
|
@Override
|
||||||
if (addedPartitions.isEmpty()) {
|
protected void onRepartitionEvent() {
|
||||||
return;
|
otherUsageStates.entrySet().removeIf(entry ->
|
||||||
}
|
partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||||
try {
|
try {
|
||||||
log.info("Initializing tenant states.");
|
log.info("Initializing tenant states.");
|
||||||
updateLock.lock();
|
updateLock.lock();
|
||||||
@ -595,11 +546,9 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
|
|||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
private void destroy() {
|
private void destroy() {
|
||||||
|
super.stop();
|
||||||
if (mailExecutor != null) {
|
if (mailExecutor != null) {
|
||||||
mailExecutor.shutdownNow();
|
mailExecutor.shutdownNow();
|
||||||
}
|
}
|
||||||
if (scheduledExecutor != null) {
|
|
||||||
scheduledExecutor.shutdownNow();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,148 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.partition;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
|
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||||
|
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public abstract class AbstractPartitionBasedService<T extends EntityId> extends TbApplicationEventListener<PartitionChangeEvent> {
|
||||||
|
|
||||||
|
protected final ConcurrentMap<TopicPartitionInfo, Set<T>> partitionedEntities = new ConcurrentHashMap<>();
|
||||||
|
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
|
||||||
|
|
||||||
|
protected ListeningScheduledExecutorService scheduledExecutor;
|
||||||
|
|
||||||
|
abstract protected String getSchedulerExecutorName();
|
||||||
|
|
||||||
|
abstract protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions);
|
||||||
|
|
||||||
|
abstract protected void cleanupEntityOnPartitionRemoval(T entityId);
|
||||||
|
|
||||||
|
public Set<T> getPartitionedEntities(TopicPartitionInfo tpi) {
|
||||||
|
return partitionedEntities.get(tpi);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void init() {
|
||||||
|
// Should be always single threaded due to absence of locks.
|
||||||
|
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled")));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ServiceType getServiceType() {
|
||||||
|
return ServiceType.TB_CORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void stop() {
|
||||||
|
if (scheduledExecutor != null) {
|
||||||
|
scheduledExecutor.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DiscoveryService will call this event from the single thread (one-by-one).
|
||||||
|
* Events order is guaranteed by DiscoveryService.
|
||||||
|
* The only concurrency is expected from the [main] thread on Application started.
|
||||||
|
* Async implementation. Locks is not allowed by design.
|
||||||
|
* Any locks or delays in this module will affect DiscoveryService and entire system
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
||||||
|
if (getServiceType().equals(partitionChangeEvent.getServiceType())) {
|
||||||
|
log.debug("onTbApplicationEvent, processing event: {}", partitionChangeEvent);
|
||||||
|
subscribeQueue.add(partitionChangeEvent.getPartitions());
|
||||||
|
scheduledExecutor.submit(this::pollInitStateFromDB);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void pollInitStateFromDB() {
|
||||||
|
final Set<TopicPartitionInfo> partitions = getLatestPartitions();
|
||||||
|
if (partitions == null) {
|
||||||
|
log.debug("Nothing to do. Partitions are empty.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
initStateFromDB(partitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initStateFromDB(Set<TopicPartitionInfo> partitions) {
|
||||||
|
try {
|
||||||
|
log.info("CURRENT PARTITIONS: {}", partitionedEntities.keySet());
|
||||||
|
log.info("NEW PARTITIONS: {}", partitions);
|
||||||
|
|
||||||
|
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
|
||||||
|
addedPartitions.removeAll(partitionedEntities.keySet());
|
||||||
|
|
||||||
|
log.info("ADDED PARTITIONS: {}", addedPartitions);
|
||||||
|
|
||||||
|
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedEntities.keySet());
|
||||||
|
removedPartitions.removeAll(partitions);
|
||||||
|
|
||||||
|
log.info("REMOVED PARTITIONS: {}", removedPartitions);
|
||||||
|
|
||||||
|
// We no longer manage current partition of entities;
|
||||||
|
removedPartitions.forEach(partition -> {
|
||||||
|
Set<T> entities = partitionedEntities.remove(partition);
|
||||||
|
entities.forEach(this::cleanupEntityOnPartitionRemoval);
|
||||||
|
});
|
||||||
|
|
||||||
|
onRepartitionEvent();
|
||||||
|
|
||||||
|
addedPartitions.forEach(tpi -> partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
|
||||||
|
|
||||||
|
if (!addedPartitions.isEmpty()) {
|
||||||
|
onAddedPartitions(addedPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduledExecutor.submit(() -> {
|
||||||
|
log.info("Managing following partitions:");
|
||||||
|
partitionedEntities.forEach((tpi, entities) -> {
|
||||||
|
log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
} catch (Throwable t) {
|
||||||
|
log.warn("Failed to init entities state from DB", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void onRepartitionEvent() {
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<TopicPartitionInfo> getLatestPartitions() {
|
||||||
|
log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size());
|
||||||
|
Set<TopicPartitionInfo> partitions = null;
|
||||||
|
while (!subscribeQueue.isEmpty()) {
|
||||||
|
partitions = subscribeQueue.poll();
|
||||||
|
log.debug("polled from the queue partitions {}", partitions);
|
||||||
|
}
|
||||||
|
log.debug("getLatestPartitionsFromQueue, partitions {}", partitions);
|
||||||
|
return partitions;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -59,6 +59,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
|
|||||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
|
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
|
||||||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
@ -94,7 +95,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
|
|||||||
@Service
|
@Service
|
||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DefaultDeviceStateService extends TbApplicationEventListener<PartitionChangeEvent> implements DeviceStateService {
|
public class DefaultDeviceStateService extends AbstractPartitionBasedService<DeviceId> implements DeviceStateService {
|
||||||
|
|
||||||
public static final String ACTIVITY_STATE = "active";
|
public static final String ACTIVITY_STATE = "active";
|
||||||
public static final String LAST_CONNECT_TIME = "lastConnectTime";
|
public static final String LAST_CONNECT_TIME = "lastConnectTime";
|
||||||
@ -131,12 +132,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
@Getter
|
@Getter
|
||||||
private int initFetchPackSize;
|
private int initFetchPackSize;
|
||||||
|
|
||||||
private ListeningScheduledExecutorService scheduledExecutor;
|
|
||||||
private ExecutorService deviceStateExecutor;
|
private ExecutorService deviceStateExecutor;
|
||||||
private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
|
|
||||||
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
|
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
|
public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
|
||||||
AttributesService attributesService, TimeseriesService tsService,
|
AttributesService attributesService, TimeseriesService tsService,
|
||||||
@ -156,21 +154,23 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
|
super.init();
|
||||||
deviceStateExecutor = Executors.newFixedThreadPool(
|
deviceStateExecutor = Executors.newFixedThreadPool(
|
||||||
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
|
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
|
||||||
// Should be always single threaded due to absence of locks.
|
|
||||||
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled")));
|
|
||||||
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
super.stop();
|
||||||
if (deviceStateExecutor != null) {
|
if (deviceStateExecutor != null) {
|
||||||
deviceStateExecutor.shutdownNow();
|
deviceStateExecutor.shutdownNow();
|
||||||
}
|
}
|
||||||
if (scheduledExecutor != null) {
|
}
|
||||||
scheduledExecutor.shutdownNow();
|
|
||||||
}
|
@Override
|
||||||
|
protected String getSchedulerExecutorName() {
|
||||||
|
return "device-state-scheduled";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -208,7 +208,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.debug("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
|
log.debug("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
|
||||||
cleanUpDeviceStateMap(deviceId);
|
cleanupEntity(deviceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -246,11 +246,11 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
|
Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
|
||||||
if (device != null) {
|
if (device != null) {
|
||||||
if (proto.getAdded()) {
|
if (proto.getAdded()) {
|
||||||
Futures.addCallback(fetchDeviceState(device), new FutureCallback<DeviceStateData>() {
|
Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable DeviceStateData state) {
|
public void onSuccess(@Nullable DeviceStateData state) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
|
||||||
if (partitionedDevices.containsKey(tpi)) {
|
if (partitionedEntities.containsKey(tpi)) {
|
||||||
addDeviceUsingState(tpi, state);
|
addDeviceUsingState(tpi, state);
|
||||||
save(deviceId, ACTIVITY_STATE, false);
|
save(deviceId, ACTIVITY_STATE, false);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
@ -286,94 +286,14 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* DiscoveryService will call this event from the single thread (one-by-one).
|
|
||||||
* Events order is guaranteed by DiscoveryService.
|
|
||||||
* The only concurrency is expected from the [main] thread on Application started.
|
|
||||||
* Async implementation. Locks is not allowed by design.
|
|
||||||
* Any locks or delays in this module will affect DiscoveryService and entire system
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||||
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
|
|
||||||
log.debug("onTbApplicationEvent ServiceType is TB_CORE, processing queue {}", partitionChangeEvent);
|
|
||||||
subscribeQueue.add(partitionChangeEvent.getPartitions());
|
|
||||||
scheduledExecutor.submit(this::pollInitStateFromDB);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void pollInitStateFromDB() {
|
|
||||||
final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue();
|
|
||||||
if (partitions == null) {
|
|
||||||
log.info("Device state service. Nothing to do. partitions is null");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
initStateFromDB(partitions);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: move to utils
|
|
||||||
Set<TopicPartitionInfo> getLatestPartitionsFromQueue() {
|
|
||||||
log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size());
|
|
||||||
Set<TopicPartitionInfo> partitions = null;
|
|
||||||
while (!subscribeQueue.isEmpty()) {
|
|
||||||
partitions = subscribeQueue.poll();
|
|
||||||
log.debug("polled from the queue partitions {}", partitions);
|
|
||||||
}
|
|
||||||
log.debug("getLatestPartitionsFromQueue, partitions {}", partitions);
|
|
||||||
return partitions;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initStateFromDB(Set<TopicPartitionInfo> partitions) {
|
|
||||||
try {
|
|
||||||
log.info("CURRENT PARTITIONS: {}", partitionedDevices.keySet());
|
|
||||||
log.info("NEW PARTITIONS: {}", partitions);
|
|
||||||
|
|
||||||
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
|
|
||||||
addedPartitions.removeAll(partitionedDevices.keySet());
|
|
||||||
|
|
||||||
log.info("ADDED PARTITIONS: {}", addedPartitions);
|
|
||||||
|
|
||||||
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedDevices.keySet());
|
|
||||||
removedPartitions.removeAll(partitions);
|
|
||||||
|
|
||||||
log.info("REMOVED PARTITIONS: {}", removedPartitions);
|
|
||||||
|
|
||||||
// We no longer manage current partition of devices;
|
|
||||||
removedPartitions.forEach(partition -> {
|
|
||||||
Set<DeviceId> devices = partitionedDevices.remove(partition);
|
|
||||||
devices.forEach(this::cleanUpDeviceStateMap);
|
|
||||||
});
|
|
||||||
|
|
||||||
addedPartitions.forEach(tpi -> partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
|
|
||||||
|
|
||||||
initPartitions(addedPartitions);
|
|
||||||
|
|
||||||
scheduledExecutor.submit(() -> {
|
|
||||||
log.info("Managing following partitions:");
|
|
||||||
partitionedDevices.forEach((tpi, devices) -> {
|
|
||||||
log.info("[{}]: {} devices", tpi.getFullTopicName(), devices.size());
|
|
||||||
});
|
|
||||||
});
|
|
||||||
} catch (Throwable t) {
|
|
||||||
log.warn("Failed to init device states from DB", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities.
|
|
||||||
//Adding only entities that are in new partitions
|
|
||||||
boolean initPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
|
||||||
if (addedPartitions.isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData();
|
List<Tenant> tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData();
|
||||||
for (Tenant tenant : tenants) {
|
for (Tenant tenant : tenants) {
|
||||||
log.debug("Finding devices for tenant [{}]", tenant.getName());
|
log.debug("Finding devices for tenant [{}]", tenant.getName());
|
||||||
final PageLink pageLink = new PageLink(initFetchPackSize);
|
final PageLink pageLink = new PageLink(initFetchPackSize);
|
||||||
scheduledExecutor.submit(() -> processPageAndSubmitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor));
|
scheduledExecutor.submit(() -> processPageAndSubmitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor));
|
||||||
|
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void processPageAndSubmitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) {
|
private void processPageAndSubmitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) {
|
||||||
@ -435,7 +355,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
|
private void addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
|
||||||
Set<DeviceId> deviceIds = partitionedDevices.get(tpi);
|
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
|
||||||
if (deviceIds != null) {
|
if (deviceIds != null) {
|
||||||
deviceIds.add(state.getDeviceId());
|
deviceIds.add(state.getDeviceId());
|
||||||
deviceStates.put(state.getDeviceId(), state);
|
deviceStates.put(state.getDeviceId(), state);
|
||||||
@ -447,7 +367,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
|
|
||||||
void updateInactivityStateIfExpired() {
|
void updateInactivityStateIfExpired() {
|
||||||
final long ts = System.currentTimeMillis();
|
final long ts = System.currentTimeMillis();
|
||||||
partitionedDevices.forEach((tpi, deviceIds) -> {
|
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||||
for (DeviceId deviceId : deviceIds) {
|
for (DeviceId deviceId : deviceIds) {
|
||||||
updateInactivityStateIfExpired(ts, deviceId);
|
updateInactivityStateIfExpired(ts, deviceId);
|
||||||
@ -473,7 +393,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
|
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
|
||||||
cleanUpDeviceStateMap(deviceId);
|
cleanupEntity(deviceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,24 +428,32 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
|||||||
|
|
||||||
private void cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
|
private void cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||||
if (!partitionedDevices.containsKey(tpi)) {
|
if (!partitionedEntities.containsKey(tpi)) {
|
||||||
cleanUpDeviceStateMap(deviceId);
|
cleanupEntity(deviceId);
|
||||||
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
|
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
|
||||||
, tenantId, deviceId, tpi.getFullTopicName());
|
, tenantId, deviceId, tpi.getFullTopicName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
|
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
|
||||||
cleanUpDeviceStateMap(deviceId);
|
cleanupEntity(deviceId);
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||||
Set<DeviceId> deviceIdSet = partitionedDevices.get(tpi);
|
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
|
||||||
deviceIdSet.remove(deviceId);
|
if (deviceIdSet != null) {
|
||||||
|
deviceIdSet.remove(deviceId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanUpDeviceStateMap(DeviceId deviceId) {
|
@Override
|
||||||
|
protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
|
||||||
|
cleanupEntity(deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupEntity(DeviceId deviceId) {
|
||||||
deviceStates.remove(deviceId);
|
deviceStates.remove(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
|
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
|
||||||
ListenableFuture<DeviceStateData> future;
|
ListenableFuture<DeviceStateData> future;
|
||||||
if (persistToTelemetry) {
|
if (persistToTelemetry) {
|
||||||
|
|||||||
@ -60,8 +60,6 @@ public class DefaultTbApiUsageStateServiceTest {
|
|||||||
@Mock
|
@Mock
|
||||||
ApiUsageStateService apiUsageStateService;
|
ApiUsageStateService apiUsageStateService;
|
||||||
@Mock
|
@Mock
|
||||||
SchedulerComponent scheduler;
|
|
||||||
@Mock
|
|
||||||
TbTenantProfileCache tenantProfileCache;
|
TbTenantProfileCache tenantProfileCache;
|
||||||
@Mock
|
@Mock
|
||||||
MailService mailService;
|
MailService mailService;
|
||||||
@ -74,7 +72,7 @@ public class DefaultTbApiUsageStateServiceTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
service = spy(new DefaultTbApiUsageStateService(clusterService, partitionService, tenantService, tsService, apiUsageStateService, scheduler, tenantProfileCache, mailService, dbExecutor));
|
service = spy(new DefaultTbApiUsageStateService(clusterService, partitionService, tenantService, tsService, apiUsageStateService, tenantProfileCache, mailService, dbExecutor));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -94,7 +92,7 @@ public class DefaultTbApiUsageStateServiceTest {
|
|||||||
willReturn(tenantUsageStateMock).given(service).getOrFetchState(tenantId, tenantId);
|
willReturn(tenantUsageStateMock).given(service).getOrFetchState(tenantId, tenantId);
|
||||||
ApiUsageState tenantUsageState = service.getApiUsageState(tenantId);
|
ApiUsageState tenantUsageState = service.getApiUsageState(tenantId);
|
||||||
assertThat(tenantUsageState, is(tenantUsageStateMock.getApiUsageState()));
|
assertThat(tenantUsageState, is(tenantUsageStateMock.getApiUsageState()));
|
||||||
assertThat(true, is(service.partitionedEntities.get(tpi).contains(tenantId)));
|
assertThat(true, is(service.getPartitionedEntities(tpi).contains(tenantId)));
|
||||||
Mockito.verify(service, times(1)).getOrFetchState(tenantId, tenantId);
|
Mockito.verify(service, times(1)).getOrFetchState(tenantId, tenantId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user