Merge remote-tracking branch 'upstream/master' into bug/fix-dialog-container-style

This commit is contained in:
Chantsova Ekaterina 2024-10-30 12:32:18 +02:00
commit 848f73e211
16 changed files with 233 additions and 7123 deletions

View File

@ -1,5 +1,4 @@
# ThingsBoard
[![Join the chat at https://gitter.im/thingsboard/chat](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/thingsboard/chat?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![ThingsBoard Builds Server Status](https://img.shields.io/teamcity/build/e/ThingsBoard_Build?label=TB%20builds%20server&server=https%3A%2F%2Fbuilds.thingsboard.io&logo=&labelColor=305680)](https://builds.thingsboard.io/viewType.html?buildTypeId=ThingsBoard_Build&guest=1)
ThingsBoard is an open-source IoT platform for data collection, processing, visualization, and device management.

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.service.entitiy.dashboard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.id.TenantId;
@ -39,6 +40,7 @@ import java.util.stream.Stream;
@TbCoreComponent
@RequiredArgsConstructor
@Slf4j
@ConditionalOnProperty(value = "transport.gateway.dashboard.sync.enabled", havingValue = "true")
public class DashboardSyncService {
private final GitSyncService gitSyncService;
@ -46,8 +48,6 @@ public class DashboardSyncService {
private final WidgetsBundleService widgetsBundleService;
private final PartitionService partitionService;
@Value("${transport.gateway.dashboard.sync.enabled:true}")
private boolean enabled;
@Value("${transport.gateway.dashboard.sync.repository_url:}")
private String repoUrl;
@Value("${transport.gateway.dashboard.sync.branch:main}")
@ -60,9 +60,6 @@ public class DashboardSyncService {
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
public void init() throws Exception {
if (!enabled) {
return;
}
gitSyncService.registerSync(REPO_KEY, repoUrl, branch, TimeUnit.HOURS.toMillis(fetchFrequencyHours), this::update);
}

View File

@ -19,7 +19,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -54,8 +53,8 @@ import org.thingsboard.server.service.install.update.ImagesUpdater;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@ -170,7 +169,6 @@ public class InstallScripts {
loadRuleChainsFromPath(tenantId, edgeChainsDir);
}
@SneakyThrows
private void loadRuleChainsFromPath(TenantId tenantId, Path ruleChainsPath) {
findRuleChainsFromPath(ruleChainsPath).forEach(path -> {
try {
@ -182,12 +180,10 @@ public class InstallScripts {
});
}
List<Path> findRuleChainsFromPath(Path ruleChainsPath) throws IOException {
List<Path> paths = new ArrayList<>();
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(ruleChainsPath, path -> path.toString().endsWith(InstallScripts.JSON_EXT))) {
dirStream.forEach(paths::add);
List<Path> findRuleChainsFromPath(Path ruleChainsPath) {
try (Stream<Path> files = listDir(ruleChainsPath).filter(path -> path.toString().endsWith(InstallScripts.JSON_EXT))) {
return files.toList();
}
return paths;
}
public RuleChain createDefaultRuleChain(TenantId tenantId, String ruleChainName) {
@ -211,11 +207,11 @@ public class InstallScripts {
return ruleChain;
}
public void loadSystemWidgets() throws Exception {
public void loadSystemWidgets() {
log.info("Loading system widgets");
Map<Path, JsonNode> widgetsBundlesMap = new HashMap<>();
Path widgetBundlesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_BUNDLES_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetBundlesDir, path -> path.toString().endsWith(JSON_EXT))) {
try (Stream<Path> dirStream = listDir(widgetBundlesDir).filter(path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
JsonNode widgetsBundleDescriptorJson;
@ -247,12 +243,14 @@ public class InstallScripts {
}
Path widgetTypesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_TYPES_DIR);
if (Files.exists(widgetTypesDir)) {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(widgetTypesDir, path -> path.toString().endsWith(JSON_EXT))) {
try (Stream<Path> dirStream = listDir(widgetTypesDir).filter(path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
JsonNode widgetTypeJson = JacksonUtil.toJsonNode(path.toFile());
WidgetTypeDetails widgetTypeDetails = JacksonUtil.treeToValue(widgetTypeJson, WidgetTypeDetails.class);
String widgetTypeJson = Files.readString(path);
widgetTypeJson = resourceService.checkSystemResourcesUsage(widgetTypeJson, ResourceType.JS_MODULE);
WidgetTypeDetails widgetTypeDetails = JacksonUtil.fromString(widgetTypeJson, WidgetTypeDetails.class);
widgetTypeService.saveWidgetType(widgetTypeDetails);
} catch (Exception e) {
log.error("Unable to load widget type from json: [{}]", path.toString());
@ -300,12 +298,12 @@ public class InstallScripts {
}
}
private void loadSystemScadaSymbols() throws Exception {
private void loadSystemScadaSymbols() {
log.info("Loading system SCADA symbols");
Path scadaSymbolsDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, SCADA_SYMBOLS_DIR);
if (Files.exists(scadaSymbolsDir)) {
WidgetTypeDetails scadaSymbolWidgetTemplate = widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "scada_symbol");
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(scadaSymbolsDir, path -> path.toString().endsWith(SVG_EXT))) {
try (Stream<Path> dirStream = listDir(scadaSymbolsDir).filter(path -> path.toString().endsWith(SVG_EXT))) {
dirStream.forEach(
path -> {
try {
@ -404,11 +402,10 @@ public class InstallScripts {
imagesUpdater.updateAssetProfilesImages();
}
@SneakyThrows
public void loadSystemImages() {
log.info("Loading system images...");
Stream<Path> dashboardsFiles = Stream.concat(Files.list(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR)),
Files.list(Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, DASHBOARDS_DIR)));
Stream<Path> dashboardsFiles = Stream.concat(listDir(Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR)),
listDir(Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, DASHBOARDS_DIR)));
try (dashboardsFiles) {
dashboardsFiles.forEach(file -> {
try {
@ -431,11 +428,9 @@ public class InstallScripts {
loadDashboardsFromDir(tenantId, customerId, dashboardsDir);
}
@SneakyThrows
private void loadDashboardsFromDir(TenantId tenantId, CustomerId customerId, Path dashboardsDir) {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dashboardsDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try (Stream<Path> dashboards = listDir(dashboardsDir).filter(path -> path.toString().endsWith(JSON_EXT))) {
dashboards.forEach(path -> {
try {
JsonNode dashboardJson = JacksonUtil.toJsonNode(path.toFile());
Dashboard dashboard = JacksonUtil.treeToValue(dashboardJson, Dashboard.class);
@ -448,8 +443,7 @@ public class InstallScripts {
log.error("Unable to load dashboard from json: [{}]", path.toString());
throw new RuntimeException("Unable to load dashboard from json", e);
}
}
);
});
}
}
@ -464,9 +458,9 @@ public class InstallScripts {
}
}
public void createOAuth2Templates() throws Exception {
public void createOAuth2Templates() {
Path oauth2ConfigTemplatesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, OAUTH2_CONFIG_TEMPLATES_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(oauth2ConfigTemplatesDir, path -> path.toString().endsWith(JSON_EXT))) {
try (Stream<Path> dirStream = listDir(oauth2ConfigTemplatesDir).filter(path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
try {
@ -489,7 +483,7 @@ public class InstallScripts {
public void loadSystemLwm2mResources() {
Path resourceLwm2mPath = Paths.get(getDataDir(), MODELS_LWM2M_DIR);
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(resourceLwm2mPath, path -> path.toString().endsWith(InstallScripts.XML_EXT))) {
try (Stream<Path> dirStream = listDir(resourceLwm2mPath).filter(path -> path.toString().endsWith(InstallScripts.XML_EXT))) {
dirStream.forEach(
path -> {
try {
@ -539,9 +533,11 @@ public class InstallScripts {
}
}
private Stream<Path> listDir(Path resourcesDir) {
private Stream<Path> listDir(Path dir) {
try {
return Files.list(resourcesDir);
return Files.list(dir);
} catch (NoSuchFileException e) {
return Stream.empty();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

View File

@ -15,13 +15,18 @@
*/
package org.thingsboard.server.service.entitiy.dashboard;
import org.junit.After;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.controller.AbstractControllerTest;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.dao.sql.resource.TbResourceRepository;
import org.thingsboard.server.dao.sql.widget.WidgetTypeRepository;
import org.thingsboard.server.dao.sql.widget.WidgetsBundleRepository;
import java.util.concurrent.TimeUnit;
@ -35,6 +40,20 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
})
public class DashboardSyncServiceTest extends AbstractControllerTest {
@Autowired
private WidgetTypeRepository widgetTypeRepository;
@Autowired
private WidgetsBundleRepository widgetsBundleRepository;
@Autowired
private TbResourceRepository resourceRepository;
@After
public void after() throws Exception {
widgetsBundleRepository.deleteAll();
widgetTypeRepository.deleteAll();
resourceRepository.deleteAll();
}
@Test
public void testGatewaysDashboardSync() throws Exception {
loginTenantAdmin();

View File

@ -39,7 +39,6 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.service.install.update.ImagesUpdater;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
@ -87,14 +86,14 @@ class InstallScriptsTest {
}
@Test
void testDefaultRuleChainsTemplates() throws IOException {
void testDefaultRuleChainsTemplates() {
Path dir = installScripts.getTenantRuleChainsDir();
installScripts.findRuleChainsFromPath(dir)
.forEach(this::validateRuleChainTemplate);
}
@Test
void testDefaultEdgeRuleChainsTemplates() throws IOException {
void testDefaultEdgeRuleChainsTemplates() {
Path dir = installScripts.getEdgeRuleChainsDir();
installScripts.findRuleChainsFromPath(dir)
.forEach(this::validateRuleChainTemplate);

View File

@ -17,9 +17,7 @@
<logger name="org.eclipse.leshan" level="INFO"/>
<logger name="org.thingsboard.server.controller.AbstractWebTest" level="INFO"/>
<logger name="org.thingsboard.server.service.script" level="INFO"/>
<logger name="org.thingsboard.server.service.entitiy.dashboard" level="TRACE"/>
<logger name="org.thingsboard.server.service.entitiy.widgets" level="TRACE"/>
<logger name="org.thingsboard.server.service.sync" level="TRACE"/>
<!-- mute TelemetryEdgeSqlTest that causes a lot of randomly generated errors -->
<logger name="org.thingsboard.server.service.edge.rpc.EdgeGrpcSession" level="OFF"/>

View File

@ -17,15 +17,19 @@ package org.thingsboard.server.dao.resource;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.RegexUtils;
import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey;
import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource;
@ -54,7 +58,7 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
@Service("TbResourceDaoService")
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
@Primary
public class BaseResourceService extends AbstractCachedEntityService<ResourceInfoCacheKey, TbResourceInfo, ResourceInfoEvictEvent> implements ResourceService {
@ -62,6 +66,8 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
protected final TbResourceDao resourceDao;
protected final TbResourceInfoDao resourceInfoDao;
protected final ResourceDataValidator resourceValidator;
@Autowired @Lazy
private ImageService imageService;
@Override
public TbResource saveResource(TbResource resource, boolean doValidate) {
@ -243,6 +249,11 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
public TbResource createOrUpdateSystemResource(ResourceType resourceType, String resourceKey, String data) {
if (resourceType == ResourceType.DASHBOARD) {
data = checkSystemResourcesUsage(data, ResourceType.JS_MODULE);
Dashboard dashboard = JacksonUtil.fromString(data, Dashboard.class);
dashboard.setTenantId(TenantId.SYS_TENANT_ID);
imageService.replaceBase64WithImageUrl(dashboard);
data = JacksonUtil.toString(dashboard);
}
TbResource resource = findResourceByTenantIdAndKey(TenantId.SYS_TENANT_ID, resourceType, resourceKey);

View File

@ -19,8 +19,10 @@ const path = require('path');
const typeDir = path.join('.', 'target', 'types');
const srcDir = path.join('.', 'target', 'types', 'src');
const stylesCss = path.join(srcDir, 'styles.css');
const moduleMapPath = path.join('src', 'app', 'modules', 'common', 'modules-map.ts');
const ngcPath = path.join('.', 'node_modules', '.bin', 'ngc');
const tailwindcss = path.join('.', 'node_modules', '.bin', 'tailwindcss');
const tsconfigPath = path.join('src', 'tsconfig.app.json');
console.log(`Remove directory: ${typeDir}`);
@ -31,12 +33,28 @@ try {
}
const cliCommand = `${ngcPath} --p ${tsconfigPath} --declaration --outDir ${srcDir}`;
console.log(cliCommand);
try {
executeCliCommand(cliCommand, 'Build types');
fromDir(srcDir, /(\.js|\.js\.map)$/, function (filename) {
try {
fs.rmSync(filename);
} catch (err) {
console.error(`Remove file error ${filename}: ${err}`);
}
});
fs.cpSync(moduleMapPath, `${typeDir}/${moduleMapPath}`);
const generateStyleCssCommand = `${tailwindcss} -o ${stylesCss} --minify`;
executeCliCommand(generateStyleCssCommand, 'Generate styles.css');
function executeCliCommand(cliCommand, description) {
console.log(cliCommand);
try {
child_process.execSync(cliCommand);
} catch (err) {
console.error("Build types", err);
} catch (err) {
console.error(description, err);
process.exit(1);
}
}
function fromDir(startPath, filter, callback) {
@ -56,13 +74,3 @@ function fromDir(startPath, filter, callback) {
}
}
}
fromDir(srcDir, /(\.js|\.js\.map)$/, function (filename) {
try {
fs.rmSync(filename);
} catch (err) {
console.error(`Remove file error ${filename}: ${err}`);
}
});
fs.cpSync(moduleMapPath, `${typeDir}/${moduleMapPath}`);

View File

@ -61,6 +61,9 @@ export class AliasController implements IAliasController {
resolvedAliases: { [aliasId: string]: AliasInfo } = {};
resolvedAliasesObservable: { [aliasId: string]: Observable<AliasInfo> } = {};
resolvedDevices: { [deviceId: string]: EntityInfo } = {};
resolvedDevicesObservable: { [deviceId: string]: Observable<EntityInfo> } = {};
resolvedAliasesToStateEntities: { [aliasId: string]: StateEntityInfo } = {};
constructor(private utils: UtilsService,
@ -261,9 +264,35 @@ export class AliasController implements IAliasController {
}
resolveSingleEntityInfoForDeviceId(deviceId: string): Observable<EntityInfo> {
let entityInfo = this.resolvedDevices[deviceId];
if (entityInfo) {
return of(entityInfo);
} else if (this.resolvedDevicesObservable[deviceId]) {
return this.resolvedDevicesObservable[deviceId];
} else {
const resolvedDeviceSubject = new ReplaySubject<EntityInfo>();
this.resolvedDevicesObservable[deviceId] = resolvedDeviceSubject.asObservable();
const entityFilter = singleEntityFilterFromDeviceId(deviceId);
return this.entityService.findSingleEntityInfoByEntityFilter(entityFilter,
{ignoreLoading: true, ignoreErrors: true});
this.entityService.findSingleEntityInfoByEntityFilter(entityFilter,
{ignoreLoading: true, ignoreErrors: true}).subscribe(
(resolvedEntityInfo) => {
this.resolvedDevices[deviceId] = resolvedEntityInfo;
delete this.resolvedDevicesObservable[deviceId];
resolvedDeviceSubject.next(resolvedEntityInfo);
resolvedDeviceSubject.complete();
},
() => {
resolvedDeviceSubject.error(null);
delete this.resolvedDevicesObservable[deviceId];
}
);
entityInfo = this.resolvedDevices[deviceId];
if (entityInfo) {
return of(entityInfo);
} else {
return this.resolvedDevicesObservable[deviceId];
}
}
}
resolveSingleEntityInfoForTargetDevice(targetDevice: TargetDevice): Observable<EntityInfo> {

View File

@ -40,24 +40,15 @@ import { WindowMessage } from '@shared/models/window-message.model';
import { TranslateService } from '@ngx-translate/core';
import { customTranslationsPrefix, i18nPrefix } from '@app/shared/models/constants';
import { DataKey, Datasource, DatasourceType, KeyInfo } from '@shared/models/widget.models';
import { DataKeyType } from '@app/shared/models/telemetry/telemetry.models';
import {
alarmFields,
alarmSeverityTranslations,
alarmStatusTranslations
} from '@shared/models/alarm.models';
import { DataKeyType, SharedTelemetrySubscriber } from '@app/shared/models/telemetry/telemetry.models';
import { alarmFields, alarmSeverityTranslations, alarmStatusTranslations } from '@shared/models/alarm.models';
import { materialColors } from '@app/shared/models/material.models';
import { WidgetInfo } from '@home/models/widget-component.models';
import jsonSchemaDefaults from 'json-schema-defaults';
import { Observable } from 'rxjs';
import { publishReplay, refCount } from 'rxjs/operators';
import { WidgetContext } from '@app/modules/home/models/widget-component.models';
import {
AttributeData,
LatestTelemetry,
TelemetrySubscriber,
TelemetryType
} from '@shared/models/telemetry/telemetry.models';
import { AttributeData, LatestTelemetry, TelemetryType } from '@shared/models/telemetry/telemetry.models';
import { EntityId } from '@shared/models/id/entity-id';
import { DatePipe, DOCUMENT } from '@angular/common';
import { entityTypeTranslations } from '@shared/models/entity-type.models';
@ -483,13 +474,13 @@ export class UtilsService {
if (!entityId && ctx.datasources.length > 0) {
entityId = this.getEntityIdFromDatasource(ctx.datasources[0]);
}
const subscription = TelemetrySubscriber.createEntityAttributesSubscription(ctx.telemetryWsService, entityId, type, ctx.ngZone, keys);
const subscription = SharedTelemetrySubscriber.createEntityAttributesSubscription(ctx.telemetryWsService, entityId, type, ctx.ngZone, keys);
if (!ctx.telemetrySubscribers) {
ctx.telemetrySubscribers = [];
}
ctx.telemetrySubscribers.push(subscription);
subscription.subscribe();
return subscription.attributeData$().pipe(
return subscription.attributeData$.pipe(
publishReplay(1),
refCount()
);

View File

@ -17,8 +17,7 @@
import {
AttributeData,
AttributeScope,
LatestTelemetry,
TelemetrySubscriber,
LatestTelemetry, SharedTelemetrySubscriber,
TelemetryType,
telemetryTypeTranslationsShort
} from '@shared/models/telemetry/telemetry.models';
@ -436,7 +435,7 @@ export class ExecuteRpcValueGetter<V> extends ValueGetter<V> {
export abstract class TelemetryValueGetter<V, S extends TelemetryValueSettings> extends ValueGetter<V> {
protected targetEntityId: EntityId;
private telemetrySubscriber: TelemetrySubscriber;
private telemetrySubscriber: SharedTelemetrySubscriber;
protected constructor(protected ctx: WidgetContext,
protected settings: GetValueSettings<V>,
@ -470,10 +469,10 @@ export abstract class TelemetryValueGetter<V, S extends TelemetryValueSettings>
private subscribeForTelemetryValue(): Observable<V> {
this.telemetrySubscriber =
TelemetrySubscriber.createEntityAttributesSubscription(this.ctx.telemetryWsService, this.targetEntityId,
SharedTelemetrySubscriber.createEntityAttributesSubscription(this.ctx.telemetryWsService, this.targetEntityId,
this.scope(), this.ctx.ngZone, [this.getTelemetryValueSettings().key]);
this.telemetrySubscriber.subscribe();
return this.telemetrySubscriber.attributeData$().pipe(
return this.telemetrySubscriber.attributeData$.pipe(
map((data) => {
let value: V = null;
const entry = data.find(attr => attr.key === this.getTelemetryValueSettings().key);

View File

@ -25,7 +25,7 @@ import {
AttributeData,
AttributeScope,
isClientSideTelemetryType,
TelemetrySubscriber,
SharedTelemetrySubscriber,
TelemetryType
} from '@shared/models/telemetry/telemetry.models';
import { AttributeService } from '@core/http/attribute.service';
@ -42,7 +42,7 @@ export class AttributeDatasource implements DataSource<AttributeData> {
public selection = new SelectionModel<AttributeData>(true, []);
private allAttributes: Observable<Array<AttributeData>>;
private telemetrySubscriber: TelemetrySubscriber;
private telemetrySubscriber: SharedTelemetrySubscriber;
constructor(private attributeService: AttributeService,
private telemetryWsService: TelemetryWebsocketService,
@ -99,10 +99,10 @@ export class AttributeDatasource implements DataSource<AttributeData> {
if (!this.allAttributes) {
let attributesObservable: Observable<Array<AttributeData>>;
if (isClientSideTelemetryType.get(attributesScope)) {
this.telemetrySubscriber = TelemetrySubscriber.createEntityAttributesSubscription(
this.telemetrySubscriber = SharedTelemetrySubscriber.createEntityAttributesSubscription(
this.telemetryWsService, entityId, attributesScope, this.zone);
this.telemetrySubscriber.subscribe();
attributesObservable = this.telemetrySubscriber.attributeData$();
attributesObservable = this.telemetrySubscriber.attributeData$;
} else {
attributesObservable = this.attributeService.getEntityAttributes(entityId, attributesScope as AttributeScope);
}

View File

@ -98,7 +98,9 @@ import * as RxJSOperators from 'rxjs/operators';
import { TbPopoverComponent } from '@shared/components/popover.component';
import { EntityId } from '@shared/models/id/entity-id';
import { AlarmQuery, AlarmSearchStatus, AlarmStatus } from '@app/shared/models/alarm.models';
import { ImagePipe, MillisecondsToTimeStringPipe, TelemetrySubscriber } from '@app/shared/public-api';
import { ImagePipe } from '@shared/pipe/image.pipe';
import { MillisecondsToTimeStringPipe } from '@shared/pipe/milliseconds-to-time-string.pipe';
import { SharedTelemetrySubscriber, TelemetrySubscriber } from '@shared/models/telemetry/telemetry.models';
import { UserId } from '@shared/models/id/user-id';
import { UserSettingsService } from '@core/http/user-settings.service';
import { DataKeySettingsFunction } from '@home/components/widget/config/data-keys.component.models';
@ -204,7 +206,7 @@ export class WidgetContext {
userSettingsService: UserSettingsService;
utilsService: UtilsService;
telemetryWsService: TelemetryWebsocketService;
telemetrySubscribers?: TelemetrySubscriber[];
telemetrySubscribers?: Array<TelemetrySubscriber | SharedTelemetrySubscriber>;
date: DatePipe;
imagePipe: ImagePipe;
milliSecondsToTimeString: MillisecondsToTimeStringPipe;

View File

@ -17,7 +17,7 @@
import { EntityType } from '@shared/models/entity-type.models';
import { AggregationType } from '../time/time.models';
import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs';
import { BehaviorSubject, connectable, Observable, ReplaySubject, Subscription } from 'rxjs';
import { EntityId } from '@shared/models/id/entity-id';
import { map } from 'rxjs/operators';
import { NgZone } from '@angular/core';
@ -731,6 +731,91 @@ export class NotificationsUpdate extends CmdUpdate {
}
}
interface SharedSubscriptionInfo {
key: string;
subscriber: TelemetrySubscriber;
subscribed: boolean;
sharedSubscribers: Set<SharedTelemetrySubscriber>;
}
export class SharedTelemetrySubscriber {
private static subscribersCache: {[key: string]: SharedSubscriptionInfo} = {};
private static createTelemetrySubscriberKey (entityId: EntityId, attributeScope: TelemetryType, keys: string[] = null): string {
let key = entityId.entityType + '_' + entityId.id + '_' + attributeScope;
if (keys) {
key += '_' + keys.sort().join('_');
}
return key;
}
private subscribed = false;
private attributeDataSubject = connectable(this.sharedSubscriptionInfo.subscriber.attributeData$(),
{ connector: () => new ReplaySubject<Array<AttributeData>>(1)});
private subscriptions = new Array<Subscription>();
public attributeData$: Observable<Array<AttributeData>> = this.attributeDataSubject; //this.attributeDataSubject.asObservable();
public static createEntityAttributesSubscription(telemetryService: TelemetryWebsocketService,
entityId: EntityId, attributeScope: TelemetryType,
zone: NgZone, keys: string[] = null): SharedTelemetrySubscriber {
const key = SharedTelemetrySubscriber.createTelemetrySubscriberKey(entityId, attributeScope, keys);
let info = SharedTelemetrySubscriber.subscribersCache[key];
if (!info) {
const subscriber = TelemetrySubscriber.createEntityAttributesSubscription(
telemetryService, entityId, attributeScope, zone, keys
);
info = {
key,
subscriber,
subscribed: false,
sharedSubscribers: new Set<SharedTelemetrySubscriber>()
};
SharedTelemetrySubscriber.subscribersCache[key] = info;
}
const sharedSubscriber = new SharedTelemetrySubscriber(info);
info.sharedSubscribers.add(sharedSubscriber);
return sharedSubscriber;
}
private constructor(private sharedSubscriptionInfo: SharedSubscriptionInfo) {
}
public subscribe() {
if (!this.subscribed) {
this.subscribed = true;
this.subscriptions.push(this.attributeDataSubject.connect());
if (!this.sharedSubscriptionInfo.subscribed) {
this.sharedSubscriptionInfo.subscriber.subscribe();
this.sharedSubscriptionInfo.subscribed = true;
}
}
}
public unsubscribe() {
if (this.subscribed) {
this.complete();
}
this.sharedSubscriptionInfo.sharedSubscribers.delete(this);
if (!this.sharedSubscriptionInfo.sharedSubscribers.size) {
if (this.sharedSubscriptionInfo.subscribed) {
this.sharedSubscriptionInfo.subscriber.unsubscribe();
this.sharedSubscriptionInfo.subscribed = false;
}
delete SharedTelemetrySubscriber.subscribersCache[this.sharedSubscriptionInfo.key];
}
}
private complete() {
this.subscriptions.forEach(subscription => subscription.unsubscribe());
this.subscriptions.length = 0;
}
}
export class TelemetrySubscriber extends WsSubscriber {
private dataSubject = new ReplaySubject<SubscriptionUpdate>(1);