diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 3e60c4d49c..f39477b543 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -67,6 +67,7 @@ import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; +import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.telemetry.InternalTelemetryService; import javax.annotation.PostConstruct; @@ -93,7 +94,7 @@ import java.util.stream.Collectors; @Slf4j @Service -public class DefaultTbApiUsageStateService extends TbApplicationEventListener implements TbApiUsageStateService { +public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService implements TbApiUsageStateService { public static final String HOURLY = "Hourly"; public static final FutureCallback VOID_CALLBACK = new FutureCallback() { @@ -123,8 +124,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener otherUsageStates = new ConcurrentHashMap<>(); - final ConcurrentMap> partitionedEntities = new ConcurrentHashMap<>(); - final Set deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); @Value("${usage.stats.report.enabled:true}") @@ -137,10 +136,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener> subscribeQueue = new ConcurrentLinkedQueue<>(); - public DefaultTbApiUsageStateService(TbClusterService clusterService, PartitionService partitionService, TenantService tenantService, @@ -162,7 +157,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener msg, TbCallback callback) { ToUsageStatsServiceMsg statsMsg = msg.getValue(); @@ -226,59 +226,6 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener partitions = getLatestPartitionsFromQueue(); - if (partitions == null) { - log.info("Api Usage state service. Nothing to do. Partitions are empty"); - return; - } - initStateFromDB(partitions); - } - - Set getLatestPartitionsFromQueue() { - log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size()); - Set partitions = null; - while (!subscribeQueue.isEmpty()) { - partitions = subscribeQueue.poll(); - log.debug("polled from the queue partitions {}", partitions); - } - log.debug("getLatestPartitionsFromQueue, partitions {}", partitions); - return partitions; - } - - private void initStateFromDB(Set partitions) { - try { - Set addedPartitions = new HashSet<>(partitions); - addedPartitions.removeAll(partitionedEntities.keySet()); - - Set removedPartitions = new HashSet<>(partitionedEntities.keySet()); - removedPartitions.removeAll(partitions); - - removedPartitions.forEach(partition -> { - Set entities = partitionedEntities.remove(partition); - entities.forEach(this::cleanUpEntitiesStateMap); - }); - - addedPartitions.forEach(tpi -> - partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); - - otherUsageStates.entrySet().removeIf(entry -> - partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition()); - - initStatesFromDataBase(addedPartitions); - } catch (Throwable t) { - log.warn("Failed to init tenant states from DB", t); - } - } - @Override public ApiUsageState getApiUsageState(TenantId tenantId) { TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); @@ -401,7 +348,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener addedPartitions) { - if (addedPartitions.isEmpty()) { - return; - } + @Override + protected void onRepartitionEvent() { + otherUsageStates.entrySet().removeIf(entry -> + partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition()); + } + @Override + protected void onAddedPartitions(Set addedPartitions) { try { log.info("Initializing tenant states."); updateLock.lock(); @@ -595,11 +546,9 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener extends TbApplicationEventListener { + + protected final ConcurrentMap> partitionedEntities = new ConcurrentHashMap<>(); + final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); + + protected ListeningScheduledExecutorService scheduledExecutor; + + abstract protected String getSchedulerExecutorName(); + + abstract protected void onAddedPartitions(Set addedPartitions); + + abstract protected void cleanupEntityOnPartitionRemoval(T entityId); + + public Set getPartitionedEntities(TopicPartitionInfo tpi) { + return partitionedEntities.get(tpi); + } + + protected void init() { + // Should be always single threaded due to absence of locks. + scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled"))); + } + + protected ServiceType getServiceType() { + return ServiceType.TB_CORE; + } + + protected void stop() { + if (scheduledExecutor != null) { + scheduledExecutor.shutdownNow(); + } + } + + /** + * DiscoveryService will call this event from the single thread (one-by-one). + * Events order is guaranteed by DiscoveryService. + * The only concurrency is expected from the [main] thread on Application started. + * Async implementation. Locks is not allowed by design. + * Any locks or delays in this module will affect DiscoveryService and entire system + */ + @Override + protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { + if (getServiceType().equals(partitionChangeEvent.getServiceType())) { + log.debug("onTbApplicationEvent, processing event: {}", partitionChangeEvent); + subscribeQueue.add(partitionChangeEvent.getPartitions()); + scheduledExecutor.submit(this::pollInitStateFromDB); + } + } + + protected void pollInitStateFromDB() { + final Set partitions = getLatestPartitions(); + if (partitions == null) { + log.debug("Nothing to do. Partitions are empty."); + return; + } + initStateFromDB(partitions); + } + + private void initStateFromDB(Set partitions) { + try { + log.info("CURRENT PARTITIONS: {}", partitionedEntities.keySet()); + log.info("NEW PARTITIONS: {}", partitions); + + Set addedPartitions = new HashSet<>(partitions); + addedPartitions.removeAll(partitionedEntities.keySet()); + + log.info("ADDED PARTITIONS: {}", addedPartitions); + + Set removedPartitions = new HashSet<>(partitionedEntities.keySet()); + removedPartitions.removeAll(partitions); + + log.info("REMOVED PARTITIONS: {}", removedPartitions); + + // We no longer manage current partition of entities; + removedPartitions.forEach(partition -> { + Set entities = partitionedEntities.remove(partition); + entities.forEach(this::cleanupEntityOnPartitionRemoval); + }); + + onRepartitionEvent(); + + addedPartitions.forEach(tpi -> partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); + + if (!addedPartitions.isEmpty()) { + onAddedPartitions(addedPartitions); + } + + scheduledExecutor.submit(() -> { + log.info("Managing following partitions:"); + partitionedEntities.forEach((tpi, entities) -> { + log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size()); + }); + }); + } catch (Throwable t) { + log.warn("Failed to init entities state from DB", t); + } + } + + protected void onRepartitionEvent() { + } + + private Set getLatestPartitions() { + log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size()); + Set partitions = null; + while (!subscribeQueue.isEmpty()) { + partitions = subscribeQueue.poll(); + log.debug("polled from the queue partitions {}", partitions); + } + log.debug("getLatestPartitionsFromQueue, partitions {}", partitions); + return partitions; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 8a24cdbf1a..1d6b0dcb4b 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -59,6 +59,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.annotation.Nonnull; @@ -94,7 +95,7 @@ import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; @Service @TbCoreComponent @Slf4j -public class DefaultDeviceStateService extends TbApplicationEventListener implements DeviceStateService { +public class DefaultDeviceStateService extends AbstractPartitionBasedService implements DeviceStateService { public static final String ACTIVITY_STATE = "active"; public static final String LAST_CONNECT_TIME = "lastConnectTime"; @@ -131,12 +132,9 @@ public class DefaultDeviceStateService extends TbApplicationEventListener> partitionedDevices = new ConcurrentHashMap<>(); - final ConcurrentMap deviceStates = new ConcurrentHashMap<>(); - final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); + final ConcurrentMap deviceStates = new ConcurrentHashMap<>(); public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService, AttributesService attributesService, TimeseriesService tsService, @@ -156,21 +154,23 @@ public class DefaultDeviceStateService extends TbApplicationEventListener() { + Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() { @Override public void onSuccess(@Nullable DeviceStateData state) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId()); - if (partitionedDevices.containsKey(tpi)) { + if (partitionedEntities.containsKey(tpi)) { addDeviceUsingState(tpi, state); save(deviceId, ACTIVITY_STATE, false); callback.onSuccess(); @@ -286,94 +286,14 @@ public class DefaultDeviceStateService extends TbApplicationEventListener partitions = getLatestPartitionsFromQueue(); - if (partitions == null) { - log.info("Device state service. Nothing to do. partitions is null"); - return; - } - initStateFromDB(partitions); - } - - // TODO: move to utils - Set getLatestPartitionsFromQueue() { - log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size()); - Set partitions = null; - while (!subscribeQueue.isEmpty()) { - partitions = subscribeQueue.poll(); - log.debug("polled from the queue partitions {}", partitions); - } - log.debug("getLatestPartitionsFromQueue, partitions {}", partitions); - return partitions; - } - - private void initStateFromDB(Set partitions) { - try { - log.info("CURRENT PARTITIONS: {}", partitionedDevices.keySet()); - log.info("NEW PARTITIONS: {}", partitions); - - Set addedPartitions = new HashSet<>(partitions); - addedPartitions.removeAll(partitionedDevices.keySet()); - - log.info("ADDED PARTITIONS: {}", addedPartitions); - - Set removedPartitions = new HashSet<>(partitionedDevices.keySet()); - removedPartitions.removeAll(partitions); - - log.info("REMOVED PARTITIONS: {}", removedPartitions); - - // We no longer manage current partition of devices; - removedPartitions.forEach(partition -> { - Set devices = partitionedDevices.remove(partition); - devices.forEach(this::cleanUpDeviceStateMap); - }); - - addedPartitions.forEach(tpi -> partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); - - initPartitions(addedPartitions); - - scheduledExecutor.submit(() -> { - log.info("Managing following partitions:"); - partitionedDevices.forEach((tpi, devices) -> { - log.info("[{}]: {} devices", tpi.getFullTopicName(), devices.size()); - }); - }); - } catch (Throwable t) { - log.warn("Failed to init device states from DB", t); - } - } - - //TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities. - //Adding only entities that are in new partitions - boolean initPartitions(Set addedPartitions) { - if (addedPartitions.isEmpty()) { - return false; - } - + protected void onAddedPartitions(Set addedPartitions) { List tenants = tenantService.findTenants(new PageLink(Integer.MAX_VALUE)).getData(); for (Tenant tenant : tenants) { log.debug("Finding devices for tenant [{}]", tenant.getName()); final PageLink pageLink = new PageLink(initFetchPackSize); scheduledExecutor.submit(() -> processPageAndSubmitNextPage(addedPartitions, tenant, pageLink, scheduledExecutor)); - } - return true; } private void processPageAndSubmitNextPage(final Set addedPartitions, final Tenant tenant, final PageLink pageLink, final ExecutorService executor) { @@ -435,7 +355,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener deviceIds = partitionedDevices.get(tpi); + Set deviceIds = partitionedEntities.get(tpi); if (deviceIds != null) { deviceIds.add(state.getDeviceId()); deviceStates.put(state.getDeviceId(), state); @@ -447,7 +367,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener { + partitionedEntities.forEach((tpi, deviceIds) -> { log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); for (DeviceId deviceId : deviceIds) { updateInactivityStateIfExpired(ts, deviceId); @@ -473,7 +393,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener deviceIdSet = partitionedDevices.get(tpi); - deviceIdSet.remove(deviceId); + Set deviceIdSet = partitionedEntities.get(tpi); + if (deviceIdSet != null) { + deviceIdSet.remove(deviceId); + } } - private void cleanUpDeviceStateMap(DeviceId deviceId) { + @Override + protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) { + cleanupEntity(deviceId); + } + + private void cleanupEntity(DeviceId deviceId) { deviceStates.remove(deviceId); } + private ListenableFuture fetchDeviceState(Device device) { ListenableFuture future; if (persistToTelemetry) { diff --git a/application/src/test/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateServiceTest.java index b446e4adc0..06146d0982 100644 --- a/application/src/test/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateServiceTest.java @@ -60,8 +60,6 @@ public class DefaultTbApiUsageStateServiceTest { @Mock ApiUsageStateService apiUsageStateService; @Mock - SchedulerComponent scheduler; - @Mock TbTenantProfileCache tenantProfileCache; @Mock MailService mailService; @@ -74,7 +72,7 @@ public class DefaultTbApiUsageStateServiceTest { @Before public void setUp() { - service = spy(new DefaultTbApiUsageStateService(clusterService, partitionService, tenantService, tsService, apiUsageStateService, scheduler, tenantProfileCache, mailService, dbExecutor)); + service = spy(new DefaultTbApiUsageStateService(clusterService, partitionService, tenantService, tsService, apiUsageStateService, tenantProfileCache, mailService, dbExecutor)); } @Test @@ -94,7 +92,7 @@ public class DefaultTbApiUsageStateServiceTest { willReturn(tenantUsageStateMock).given(service).getOrFetchState(tenantId, tenantId); ApiUsageState tenantUsageState = service.getApiUsageState(tenantId); assertThat(tenantUsageState, is(tenantUsageStateMock.getApiUsageState())); - assertThat(true, is(service.partitionedEntities.get(tpi).contains(tenantId))); + assertThat(true, is(service.getPartitionedEntities(tpi).contains(tenantId))); Mockito.verify(service, times(1)).getOrFetchState(tenantId, tenantId); }