diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 733cf75e64..28aaa583ab 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -80,6 +80,7 @@ import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; +import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; @@ -220,6 +221,11 @@ public class ActorSystemContext { @Getter private EntityViewService entityViewService; + @Lazy + @Autowired(required = false) + @Getter + private TbEntityViewService tbEntityViewService; + @Autowired @Getter private TelemetrySubscriptionService tsSubService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index dbbb6ddb1f..90dbaf3312 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -72,6 +72,7 @@ import org.thingsboard.server.gen.edge.v1.RelationRequestMsg; import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg; import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg; +import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import java.util.ArrayList; @@ -101,7 +102,7 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService { private DeviceService deviceService; @Autowired - private EntityViewService entityViewService; + private TbEntityViewService entityViewService; @Autowired private DeviceProfileService deviceProfileService; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java index 8439c1780b..7abe031f2e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; @@ -93,13 +94,13 @@ public abstract class AbstractTbEntityService { @Autowired protected DbCallbackExecutorService dbExecutor; - @Autowired + @Autowired(required = false) protected TbNotificationEntityService notificationEntityService; @Autowired(required = false) protected EdgeService edgeService; @Autowired protected AlarmService alarmService; - @Autowired + @Autowired(required = false) protected EntityActionService entityActionService; @Autowired protected DeviceService deviceService; @@ -111,24 +112,27 @@ public abstract class AbstractTbEntityService { protected TenantService tenantService; @Autowired protected CustomerService customerService; - @Autowired + @Lazy + @Autowired(required = false) protected ClaimDevicesService claimDevicesService; @Autowired protected TbTenantProfileCache tenantProfileCache; @Autowired protected RuleChainService ruleChainService; - @Autowired + @Autowired(required = false) protected TbRuleChainService tbRuleChainService; - @Autowired + @Autowired(required = false) protected EdgeNotificationService edgeNotificationService; @Autowired protected QueueService queueService; @Autowired protected DashboardService dashboardService; + @Autowired protected EntitiesVersionControlService vcService; @Autowired protected EntityViewService entityViewService; + @Lazy @Autowired protected TelemetrySubscriptionService tsSubService; @Autowired @@ -149,7 +153,7 @@ public abstract class AbstractTbEntityService { protected InstallScripts installScripts; @Autowired protected UserService userService; - @Autowired + @Autowired(required = false) protected TbResourceService resourceService; @Autowired protected WidgetsBundleService widgetsBundleService; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java index 2eee167e75..cd30276fab 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; @@ -40,8 +41,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.dao.timeseries.TimeseriesService; -import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; import org.thingsboard.server.service.security.model.SecurityUser; @@ -50,19 +52,22 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import static org.apache.commons.lang3.StringUtils.isBlank; @Service -@TbCoreComponent @AllArgsConstructor @Slf4j public class DefaultTbEntityViewService extends AbstractTbEntityService implements TbEntityViewService { private final TimeseriesService tsService; + final Map>> localCache = new ConcurrentHashMap<>(); + @Override public EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException { ActionType actionType = entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED; @@ -71,6 +76,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen this.updateEntityViewAttributes(user, savedEntityView, existingEntityView); notificationEntityService.notifyCreateOrUpdateEntity(savedEntityView.getTenantId(), savedEntityView.getId(), savedEntityView, null, actionType, user); + localCache.computeIfAbsent(savedEntityView.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>()).clear(); + tbClusterService.broadcastEntityStateChangeEvent(savedEntityView.getTenantId(), savedEntityView.getId(), + entityView.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); return savedEntityView; } catch (Exception e) { notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.ENTITY_VIEW), entityView, null, actionType, user, e); @@ -122,6 +130,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen entityViewService.deleteEntityView(tenantId, entityViewId); notificationEntityService.notifyDeleteEntity(tenantId, entityViewId, entityView, entityView.getCustomerId(), ActionType.DELETED, relatedEdgeIds, user, entityViewId.toString()); + + localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>()).clear(); + tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityViewId, ComponentLifecycleEvent.DELETED); } catch (Exception e) { notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.ENTITY_VIEW), null, null, ActionType.DELETED, user, e, entityViewId.toString()); @@ -214,6 +225,51 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen } } + @Override + public ListenableFuture> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId) { + Map> localCacheByTenant = localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>()); + List fromLocalCache = localCacheByTenant.get(entityId); + if (fromLocalCache != null) { + return Futures.immediateFuture(fromLocalCache); + } + + ListenableFuture> future = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId); + + return Futures.transform(future, (entityViewList) -> { + localCacheByTenant.put(entityId, entityViewList); + return entityViewList; + }, MoreExecutors.directExecutor()); + } + + @Override + public void onComponentLifecycleMsg(ComponentLifecycleMsg componentLifecycleMsg) { + Map> localCacheByTenant = localCache.computeIfAbsent(componentLifecycleMsg.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>()); + EntityViewId entityViewId = new EntityViewId(componentLifecycleMsg.getEntityId().getId()); + deleteOldCacheValue(localCacheByTenant, entityViewId); + if (componentLifecycleMsg.getEvent() != ComponentLifecycleEvent.DELETED) { + EntityView entityView = entityViewService.findEntityViewById(componentLifecycleMsg.getTenantId(), entityViewId); + if (entityView != null) { + localCacheByTenant.remove(entityView.getEntityId()); + } + } + } + + private void deleteOldCacheValue(Map> localCacheByTenant, EntityViewId entityViewId) { + for (var entry : localCacheByTenant.entrySet()) { + EntityView toDelete = null; + for (EntityView view : entry.getValue()) { + if (entityViewId.equals(view.getId())) { + toDelete = view; + break; + } + } + if (toDelete != null) { + entry.getValue().remove(toDelete); + break; + } + } + } + private ListenableFuture> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection keys, SecurityUser user) throws ThingsboardException { EntityViewId entityId = entityView.getId(); if (keys != null && !keys.isEmpty()) { @@ -229,8 +285,8 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen long lastUpdateTs = attributeKvEntry.getLastUpdateTs(); return startTime == 0 && endTime == 0 || (endTime == 0 && startTime < lastUpdateTs) || - (startTime == 0 && endTime > lastUpdateTs) - ? true : startTime < lastUpdateTs && endTime > lastUpdateTs; + (startTime == 0 && endTime > lastUpdateTs) || + (startTime < lastUpdateTs && endTime > lastUpdateTs); }).collect(Collectors.toList()); tsSubService.saveAndNotify(entityView.getTenantId(), entityId, scope, attributes, new FutureCallback() { @Override @@ -345,7 +401,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen @Override public void onFailure(Throwable t) { try { - logTimeseriesDeleted(entityView.getTenantId(),user, entityId, keys, t); + logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, t); } catch (ThingsboardException e) { log.error("Failed to log timeseries delete", e); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java index e7e150531b..e762877bc3 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java @@ -15,22 +15,27 @@ */ package org.thingsboard.server.service.entitiy.entityView; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleListener; import org.thingsboard.server.service.security.model.SecurityUser; -public interface TbEntityViewService { +import java.util.List; + +public interface TbEntityViewService extends ComponentLifecycleListener { EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException; void updateEntityViewAttributes(SecurityUser user, EntityView savedEntityView, EntityView oldEntityView) throws ThingsboardException; - void delete (EntityView entity, SecurityUser user) throws ThingsboardException; + void delete (EntityView entity, SecurityUser user) throws ThingsboardException; EntityView assignEntityViewToCustomer(TenantId tenantId, EntityViewId entityViewId, Customer customer, SecurityUser user) throws ThingsboardException; @@ -46,4 +51,6 @@ public interface TbEntityViewService { EntityView unassignEntityViewFromCustomer(TenantId tenantId, EntityViewId entityViewId, Customer customer, SecurityUser user) throws ThingsboardException; + + ListenableFuture> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 01adab5f6f..9e60f69620 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -364,13 +364,14 @@ public class DefaultTbClusterService implements TbClusterService { private void broadcast(ComponentLifecycleMsg msg) { byte[] msgBytes = encodingService.encode(msg); TbQueueProducer> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); - Set tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)); + Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); EntityType entityType = msg.getEntityId().getEntityType(); if (entityType.equals(EntityType.TENANT) || entityType.equals(EntityType.TENANT_PROFILE) || entityType.equals(EntityType.DEVICE_PROFILE) || entityType.equals(EntityType.API_USAGE_STATE) || (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED) + || entityType.equals(EntityType.ENTITY_VIEW) || entityType.equals(EntityType.EDGE)) { TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 0c130c9f62..98fb95ad15 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -180,6 +180,8 @@ public abstract class AbstractConsumerService onTimeSeriesUpdate(tenantId, entityId, ts)); if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) { - Futures.addCallback(this.entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), + Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback>() { @Override public void onSuccess(@Nullable List result) { - if (result != null) { + if (result != null && !result.isEmpty()) { Map> tsMap = new HashMap<>(); for (TsKvEntry entry : ts) { tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java index 1a032a392b..65558e1955 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java @@ -45,7 +45,7 @@ import java.util.List; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @Slf4j -public class BaseEntityRelationControllerTest extends AbstractControllerTest { +public abstract class BaseEntityRelationControllerTest extends AbstractControllerTest { public static final String BASE_DEVICE_NAME = "Test dummy device"; diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java index 36343c3662..bb790c78cb 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.TimePageLink; import java.util.List; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleListener.java b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleListener.java new file mode 100644 index 0000000000..2bcba593e4 --- /dev/null +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleListener.java @@ -0,0 +1,20 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.msg.plugin; + +public interface ComponentLifecycleListener { + void onComponentLifecycleMsg(ComponentLifecycleMsg componentLifecycleMsg); +} diff --git a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java index c8583527f1..cf81690826 100644 --- a/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java +++ b/common/message/src/test/java/org/thingsboard/server/common/msg/tools/RateLimitsTest.java @@ -15,8 +15,11 @@ */ package org.thingsboard.server.common.msg.tools; +import org.awaitility.pollinterval.FixedPollInterval; +import org.awaitility.pollinterval.PollInterval; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -39,10 +42,11 @@ public class RateLimitsTest { assertThat(rateLimits.tryConsume()).as("new token is available").isFalse(); int expectedRefillTime = (int) (((double) period / capacity) * 1000); - int gap = 100; + int gap = 500; for (int i = 0; i < capacity; i++) { await("token refill for rate limit " + rateLimitConfig) + .pollInterval(new FixedPollInterval(10, TimeUnit.MILLISECONDS)) .atLeast(expectedRefillTime - gap, TimeUnit.MILLISECONDS) .atMost(expectedRefillTime + gap, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -70,6 +74,7 @@ public class RateLimitsTest { int gap = 500; await("tokens refill for rate limit " + rateLimitConfig) + .pollInterval(new FixedPollInterval(10, TimeUnit.MILLISECONDS)) .atLeast(expectedRefillTime - gap, TimeUnit.MILLISECONDS) .atMost(expectedRefillTime + gap, TimeUnit.MILLISECONDS) .untilAsserted(() -> { diff --git a/docker/docker-compose.kafka.yml b/docker/docker-compose.kafka.yml index 09c4554562..1683114605 100644 --- a/docker/docker-compose.kafka.yml +++ b/docker/docker-compose.kafka.yml @@ -19,7 +19,7 @@ version: '2.2' services: kafka: restart: always - image: "wurstmeister/kafka:2.13-2.6.0" + image: "bitnami/kafka:3.2.0" ports: - "9092:9092" env_file: diff --git a/docker/kafka.env b/docker/kafka.env index bd23f99ad1..9c28885252 100644 --- a/docker/kafka.env +++ b/docker/kafka.env @@ -1,12 +1,11 @@ - -KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -KAFKA_LISTENERS=INSIDE://:9093,OUTSIDE://:9092 -KAFKA_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092 -KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT -KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE -KAFKA_CREATE_TOPICS=js_eval.requests:100:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600,tb_transport.api.requests:3:1:delete --config=retention.ms=60000 --config=segment.bytes=26214400 --config=retention.bytes=104857600 -KAFKA_AUTO_CREATE_TOPICS_ENABLE=false -KAFKA_LOG_RETENTION_BYTES=1073741824 -KAFKA_LOG_SEGMENT_BYTES=268435456 -KAFKA_LOG_RETENTION_MS=300000 -KAFKA_LOG_CLEANUP_POLICY=delete +ALLOW_PLAINTEXT_LISTENER=yes +KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 +KAFKA_CFG_LISTENERS=INSIDE://:9093,OUTSIDE://:9092 +KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE://:9093,OUTSIDE://kafka:9092 +KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT +KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false +KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE +KAFKA_CFG_LOG_RETENTION_BYTES=1073741824 +KAFKA_CFG_SEGMENT_BYTES=268435456 +KAFKA_CFG_LOG_RETENTION_MS=300000 +KAFKA_CFG_LOG_CLEANUP_POLICY=delete diff --git a/pom.xml b/pom.xml index 9fb7ba43e4..cb2264773d 100755 --- a/pom.xml +++ b/pom.xml @@ -98,8 +98,9 @@ 5.0.2 0.2.1 - 2.8.0 + org.apache.kafka.common.network.NetworkReceive class in the application module. It addresses the issue https://issues.apache.org/jira/browse/KAFKA-4090. + Here is the source to track https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/network --> + 3.2.0 4.1.1 2.57 2.7.7 diff --git a/ui-ngx/src/app/modules/home/models/widget-component.models.ts b/ui-ngx/src/app/modules/home/models/widget-component.models.ts index 465cade9f2..4e250b71ea 100644 --- a/ui-ngx/src/app/modules/home/models/widget-component.models.ts +++ b/ui-ngx/src/app/modules/home/models/widget-component.models.ts @@ -50,7 +50,6 @@ import { WidgetTypeId } from '@shared/models/id/widget-type-id'; import { TenantId } from '@shared/models/id/tenant-id'; import { WidgetLayout } from '@shared/models/dashboard.models'; import { formatValue, isDefined } from '@core/utils'; -import { forkJoin, of } from 'rxjs'; import { Store } from '@ngrx/store'; import { AppState } from '@core/core.state'; import { @@ -79,7 +78,9 @@ import { PageLink } from '@shared/models/page/page-link'; import { SortOrder } from '@shared/models/page/sort-order'; import { DomSanitizer } from '@angular/platform-browser'; import { Router } from '@angular/router'; -import { catchError, map, mergeMap, switchMap } from 'rxjs/operators'; +import { EdgeService } from '@core/http/edge.service'; +import * as RxJS from 'rxjs'; +import * as RxJSOperators from 'rxjs/operators'; import { TbPopoverComponent } from '@shared/components/popover.component'; import { EntityId } from '@shared/models/id/entity-id'; @@ -160,6 +161,7 @@ export class WidgetContext { deviceService: DeviceService; assetService: AssetService; entityViewService: EntityViewService; + edgeService: EdgeService; customerService: CustomerService; dashboardService: DashboardService; userService: UserService; @@ -203,7 +205,7 @@ export class WidgetContext { if (this.defaultSubscription) { return this.defaultSubscription.sendOneWayCommand(method, params, timeout, persistent, retries, additionalInfo, requestUUID); } else { - return of(null); + return RxJS.of(null); } }, sendTwoWayCommand: (method, params, timeout, persistent, @@ -211,14 +213,14 @@ export class WidgetContext { if (this.defaultSubscription) { return this.defaultSubscription.sendTwoWayCommand(method, params, timeout, persistent, retries, additionalInfo, requestUUID); } else { - return of(null); + return RxJS.of(null); } }, completedCommand: () => { if (this.defaultSubscription) { return this.defaultSubscription.completedCommand(); } else { - return of(null); + return RxJS.of(null); } } }; @@ -266,12 +268,9 @@ export class WidgetContext { private popoverComponents: TbPopoverComponent[] = []; rxjs = { - forkJoin, - of, - map, - mergeMap, - switchMap, - catchError + + ...RxJS, + ...RxJSOperators }; registerPopoverComponent(popoverComponent: TbPopoverComponent) {