From 43c2ae653d2b47d2725d54e6ba1cb633d1c0f5a4 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 11 Sep 2023 17:04:25 +0300 Subject: [PATCH] Resource support for edge --- .../edge/DefaultEdgeNotificationService.java | 7 ++ .../service/edge/EdgeContextComponent.java | 8 ++ .../service/edge/rpc/EdgeGrpcSession.java | 8 ++ .../service/edge/rpc/EdgeSyncCursor.java | 4 + .../constructor/ResourceMsgConstructor.java | 57 ++++++++++ .../fetch/BaseResourceEdgeEventFetcher.java | 49 +++++++++ .../SystemResourcesEdgeEventFetcher.java | 34 ++++++ .../TenantResourcesEdgeEventFetcher.java | 36 +++++++ .../TenantWidgetsBundlesEdgeEventFetcher.java | 1 + .../edge/rpc/processor/BaseEdgeProcessor.java | 12 +++ .../resource/BaseResourceProcessor.java | 61 +++++++++++ .../resource/ResourceEdgeProcessor.java | 100 ++++++++++++++++++ .../server/edge/AbstractEdgeTest.java | 3 +- .../asset/AssetEdgeProcessorTest.java | 3 +- .../asset/AssetProfileEdgeProcessorTest.java | 2 +- .../device/DeviceEdgeProcessorTest.java | 3 +- .../DeviceProfileEdgeProcessorTest.java | 3 +- .../server/dao/resource/ResourceService.java | 4 + .../common/data/edge/EdgeEventType.java | 3 +- .../common/data/id/EntityIdFactory.java | 2 + common/edge-api/src/main/proto/edge.proto | 15 +++ .../dao/resource/BaseResourceService.java | 28 ++++- 22 files changed, 430 insertions(+), 13 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/ResourceMsgConstructor.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseResourceEdgeEventFetcher.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemResourcesEdgeEventFetcher.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantResourcesEdgeEventFetcher.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/ResourceEdgeProcessor.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 69852e6c05..f3832d8fcf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -48,6 +48,7 @@ import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEd import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantProfileEdgeProcessor; @@ -125,6 +126,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @Autowired private RelationEdgeProcessor relationProcessor; + @Autowired + private ResourceEdgeProcessor resourceEdgeProcessor; + @Autowired protected ApplicationEventPublisher eventPublisher; @@ -215,6 +219,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case TENANT_PROFILE: future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); break; + case TB_RESOURCE: + future = resourceEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; default: log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type); future = Futures.immediateFuture(null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index dc83e0f2a2..7850032142 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -33,6 +33,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.queue.QueueService; +import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.settings.AdminSettingsService; import org.thingsboard.server.dao.tenant.TenantProfileService; @@ -55,6 +56,7 @@ import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEd import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.settings.AdminSettingsEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdgeProcessor; @@ -139,6 +141,9 @@ public class EdgeContextComponent { @Autowired private QueueService queueService; + @Autowired + private ResourceService resourceService; + @Autowired private AlarmEdgeProcessor alarmProcessor; @@ -199,6 +204,9 @@ public class EdgeContextComponent { @Autowired private TenantProfileEdgeProcessor tenantProfileEdgeProcessor; + @Autowired + private ResourceEdgeProcessor resourceEdgeProcessor; + @Autowired private EdgeMsgConstructor edgeMsgConstructor; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index cfb92d0bfd..cf3f325260 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -63,6 +63,7 @@ import org.thingsboard.server.gen.edge.v1.RelationRequestMsg; import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.RequestMsgType; +import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg; @@ -645,6 +646,8 @@ public final class EdgeGrpcSession implements Closeable { return ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent); case OTA_PACKAGE: return ctx.getOtaPackageEdgeProcessor().convertOtaPackageEventToDownlink(edgeEvent); + case TB_RESOURCE: + return ctx.getResourceEdgeProcessor().convertResourceEventToDownlink(edgeEvent); case QUEUE: return ctx.getQueueEdgeProcessor().convertQueueEventToDownlink(edgeEvent); case TENANT: @@ -710,6 +713,11 @@ public final class EdgeGrpcSession implements Closeable { result.add(ctx.getDashboardProcessor().processDashboardMsgFromEdge(edge.getTenantId(), edge, dashboardUpdateMsg)); } } + if (uplinkMsg.getResourceUpdateMsgCount() > 0) { + for (ResourceUpdateMsg resourceUpdateMsg : uplinkMsg.getResourceUpdateMsgList()) { + result.add(ctx.getResourceEdgeProcessor().processResourceMsgFromEdge(edge.getTenantId(), resourceUpdateMsg)); + } + } if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { result.add(ctx.getEdgeRequestsService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg)); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index a39dae3b54..692a46ff44 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -33,10 +33,12 @@ import org.thingsboard.server.service.edge.rpc.fetch.EntityViewsEdgeEventFetcher import org.thingsboard.server.service.edge.rpc.fetch.OtaPackagesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.QueuesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.SystemResourcesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetTypesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.TenantResourcesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetTypesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher; @@ -77,6 +79,8 @@ public class EdgeSyncCursor { fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new OtaPackagesEdgeEventFetcher(ctx.getOtaPackageService())); + fetchers.add(new SystemResourcesEdgeEventFetcher(ctx.getResourceService())); + fetchers.add(new TenantResourcesEdgeEventFetcher(ctx.getResourceService())); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/ResourceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/ResourceMsgConstructor.java new file mode 100644 index 0000000000..27b7dc3978 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/ResourceMsgConstructor.java @@ -0,0 +1,57 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor; + +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +@Component +@TbCoreComponent +public class ResourceMsgConstructor { + + public ResourceUpdateMsg constructResourceUpdatedMsg(UpdateMsgType msgType, TbResource tbResource) { + ResourceUpdateMsg.Builder builder = ResourceUpdateMsg.newBuilder() + .setMsgType(msgType) + .setIdMSB(tbResource.getId().getId().getMostSignificantBits()) + .setIdLSB(tbResource.getId().getId().getLeastSignificantBits()) + .setTitle(tbResource.getTitle()) + .setResourceKey(tbResource.getResourceKey()) + .setResourceType(tbResource.getResourceType().name()) + .setFileName(tbResource.getFileName()); + if (tbResource.getData() != null) { + builder.setData(tbResource.getData()); + } + if (tbResource.getEtag() != null) { + builder.setEtag(tbResource.getEtag()); + } + if (tbResource.getTenantId().equals(TenantId.SYS_TENANT_ID)) { + builder.setIsSystem(true); + } + return builder.build(); + } + + public ResourceUpdateMsg constructResourceDeleteMsg(TbResourceId tbResourceId) { + return ResourceUpdateMsg.newBuilder() + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE) + .setIdMSB(tbResourceId.getId().getMostSignificantBits()) + .setIdLSB(tbResourceId.getId().getLeastSignificantBits()).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseResourceEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseResourceEdgeEventFetcher.java new file mode 100644 index 0000000000..5e0ea661e0 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseResourceEdgeEventFetcher.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.resource.ResourceService; + +@Slf4j +@AllArgsConstructor +public abstract class BaseResourceEdgeEventFetcher extends BasePageableEdgeEventFetcher { + + protected final ResourceService resourceService; + + @Override + PageData fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) { + return findTenantResources(tenantId, pageLink); + } + + @Override + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, TbResource tbResource) { + return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.TB_RESOURCE, + EdgeEventActionType.ADDED, tbResource.getId(), null); + } + + protected abstract PageData findTenantResources(TenantId tenantId, PageLink pageLink); +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemResourcesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemResourcesEdgeEventFetcher.java new file mode 100644 index 0000000000..42700ded3e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemResourcesEdgeEventFetcher.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.resource.ResourceService; + +public class SystemResourcesEdgeEventFetcher extends BaseResourceEdgeEventFetcher { + + public SystemResourcesEdgeEventFetcher(ResourceService resourceService) { + super(resourceService); + } + + @Override + protected PageData findTenantResources(TenantId tenantId, PageLink pageLink) { + return resourceService.findAllTenantResources(TenantId.SYS_TENANT_ID, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantResourcesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantResourcesEdgeEventFetcher.java new file mode 100644 index 0000000000..0992565285 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantResourcesEdgeEventFetcher.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.resource.ResourceService; + +@Slf4j +public class TenantResourcesEdgeEventFetcher extends BaseResourceEdgeEventFetcher { + + public TenantResourcesEdgeEventFetcher(ResourceService resourceService) { + super(resourceService); + } + + @Override + protected PageData findTenantResources(TenantId tenantId, PageLink pageLink) { + return resourceService.findAllTenantResources(tenantId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java index ffddb65315..8f69a904e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java @@ -28,6 +28,7 @@ public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdge public TenantWidgetsBundlesEdgeEventFetcher(WidgetsBundleService widgetsBundleService) { super(widgetsBundleService); } + @Override protected PageData findWidgetsBundles(TenantId tenantId, PageLink pageLink) { return widgetsBundleService.findTenantWidgetsBundlesByTenantId(tenantId, pageLink); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 328a64efe2..99375dc747 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.edge.Edge; @@ -72,6 +73,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.tenant.TenantProfileService; @@ -101,6 +103,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.OtaPackageMsgConstruc import org.thingsboard.server.service.edge.rpc.constructor.QueueMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.ResourceMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.TenantMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.TenantProfileMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; @@ -211,6 +214,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected PartitionService partitionService; + @Autowired + protected ResourceService resourceService; + @Autowired @Lazy protected TbQueueProducerProvider producerProvider; @@ -233,6 +239,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected DataValidator entityViewValidator; + @Autowired + protected DataValidator resourceValidator; + @Autowired protected EdgeMsgConstructor edgeMsgConstructor; @@ -293,6 +302,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected QueueMsgConstructor queueMsgConstructor; + @Autowired + protected ResourceMsgConstructor resourceMsgConstructor; + @Autowired protected EdgeSynchronizationManager edgeSynchronizationManager; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java new file mode 100644 index 0000000000..13ca012f3c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.resource; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.ResourceType; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.TbResourceInfo; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +@Slf4j +public abstract class BaseResourceProcessor extends BaseEdgeProcessor { + + protected void saveOrUpdateTbResource(TenantId tenantId, TbResourceId tbResourceId, ResourceUpdateMsg resourceUpdateMsg) { + try { + boolean created = false; + TbResource resource = resourceService.findResourceById(tenantId, tbResourceId); + if (resource == null) { + resource = new TbResource(); + if (resourceUpdateMsg.getIsSystem()) { + resource.setTenantId(TenantId.SYS_TENANT_ID); + } else { + resource.setTenantId(tenantId); + } + resource.setCreatedTime(Uuids.unixTimestamp(tbResourceId.getId())); + created = true; + } + resource.setTitle(resourceUpdateMsg.getTitle()); + resource.setResourceKey(resourceUpdateMsg.getResourceKey()); + resource.setResourceType(ResourceType.valueOf(resourceUpdateMsg.getResourceType())); + resource.setFileName(resourceUpdateMsg.getFileName()); + resource.setData(resourceUpdateMsg.hasData() ? resourceUpdateMsg.getData() : null); + resource.setEtag(resourceUpdateMsg.hasEtag() ? resourceUpdateMsg.getEtag() : null); + resourceValidator.validate(resource, TbResourceInfo::getTenantId); + if (created) { + resource.setId(tbResourceId); + } + resourceService.saveResource(resource, false); + } catch (Exception e) { + log.error("[{}] Failed to process resource update msg [{}]", tenantId, resourceUpdateMsg, e); + throw e; + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/ResourceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/ResourceEdgeProcessor.java new file mode 100644 index 0000000000..ebc9a5440e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/ResourceEdgeProcessor.java @@ -0,0 +1,100 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.resource; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.UUID; + +@Component +@Slf4j +@TbCoreComponent +public class ResourceEdgeProcessor extends BaseResourceProcessor { + + public ListenableFuture processResourceMsgFromEdge(TenantId tenantId, ResourceUpdateMsg resourceUpdateMsg) { + TbResourceId tbResourceId = new TbResourceId(new UUID(resourceUpdateMsg.getIdMSB(), resourceUpdateMsg.getIdLSB())); + try { + edgeSynchronizationManager.getSync().set(true); + + switch (resourceUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + super.saveOrUpdateTbResource(tenantId, tbResourceId, resourceUpdateMsg); + break; + case ENTITY_DELETED_RPC_MESSAGE: + TbResource tbResourceToDelete = resourceService.findResourceById(tenantId, tbResourceId); + if (tbResourceToDelete != null) { + resourceService.deleteResource(tenantId, tbResourceId); + } + break; + case UNRECOGNIZED: + return handleUnsupportedMsgType(resourceUpdateMsg.getMsgType()); + } + } catch (DataValidationException e) { + if (e.getMessage().contains("files size limit is exhausted")) { + log.warn("[{}] Resource data size has been exhausted {}", tenantId, resourceUpdateMsg, e); + return Futures.immediateFuture(null); + } else { + return Futures.immediateFailedFuture(e); + } + } finally { + edgeSynchronizationManager.getSync().remove(); + } + return Futures.immediateFuture(null); + } + + public DownlinkMsg convertResourceEventToDownlink(EdgeEvent edgeEvent) { + TbResourceId tbResourceId = new TbResourceId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (edgeEvent.getAction()) { + case ADDED: + case UPDATED: + TbResource tbResource = resourceService.findResourceById(edgeEvent.getTenantId(), tbResourceId); + if (tbResource != null) { + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + ResourceUpdateMsg resourceUpdateMsg = + resourceMsgConstructor.constructResourceUpdatedMsg(msgType, tbResource); + downlinkMsg = DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addResourceUpdateMsg(resourceUpdateMsg) + .build(); + } + break; + case DELETED: + ResourceUpdateMsg resourceUpdateMsg = + resourceMsgConstructor.constructResourceDeleteMsg(tbResourceId); + downlinkMsg = DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addResourceUpdateMsg(resourceUpdateMsg) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 410348762f..a54f0e2a5d 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.edge; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.google.protobuf.InvalidProtocolBufferException; @@ -393,7 +392,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { Assert.assertEquals(expectedRuleChainUUID, ruleChainUUID); } - private void validateAdminSettings() throws JsonProcessingException { + private void validateAdminSettings() { List adminSettingsUpdateMsgs = edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class); Assert.assertEquals(4, adminSettingsUpdateMsgs.size()); diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessorTest.java index d8c8041694..fcad4a4500 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessorTest.java @@ -40,5 +40,4 @@ class AssetEdgeProcessorTest extends AbstractAssetProcessorTest { verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); } - -} \ No newline at end of file +} diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessorTest.java index e0a6155664..bafad59050 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessorTest.java @@ -40,4 +40,4 @@ class AssetProfileEdgeProcessorTest extends AbstractAssetProcessorTest{ verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); } -} \ No newline at end of file +} diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorTest.java index f64bd898d1..e40ea364be 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorTest.java @@ -39,5 +39,4 @@ class DeviceEdgeProcessorTest extends AbstractDeviceProcessorTest { verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); } - -} \ No newline at end of file +} diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessorTest.java index 35d44852e9..c2e9dca077 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessorTest.java @@ -41,5 +41,4 @@ class DeviceProfileEdgeProcessorTest extends AbstractDeviceProcessorTest { verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); } - -} \ No newline at end of file +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java index 6f0e362209..5bc45ebc29 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java @@ -32,12 +32,16 @@ public interface ResourceService extends EntityDaoService { TbResource saveResource(TbResource resource); + TbResource saveResource(TbResource resource, boolean doValidate); + TbResource getResource(TenantId tenantId, ResourceType resourceType, String resourceId); TbResource findResourceById(TenantId tenantId, TbResourceId resourceId); TbResourceInfo findResourceInfoById(TenantId tenantId, TbResourceId resourceId); + PageData findAllTenantResources(TenantId tenantId, PageLink pageLink); + ListenableFuture findResourceInfoByIdAsync(TenantId tenantId, TbResourceId resourceId); PageData findAllTenantResourcesByTenantId(TbResourceInfoFilter filter, PageLink pageLink); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index 1397deb5d7..5ecd8eab27 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -39,7 +39,8 @@ public enum EdgeEventType { WIDGET_TYPE(true, EntityType.WIDGET_TYPE), ADMIN_SETTINGS(true, null), OTA_PACKAGE(true, EntityType.OTA_PACKAGE), - QUEUE(true, EntityType.QUEUE); + QUEUE(true, EntityType.QUEUE), + TB_RESOURCE(true, EntityType.TB_RESOURCE); private final boolean allEdgesRelated; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index 0cdf3ad1eb..cc66919c0f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -139,6 +139,8 @@ public class EntityIdFactory { return new EdgeId(uuid); case QUEUE: return new QueueId(uuid); + case TB_RESOURCE: + return new TbResourceId(uuid); } throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 50d5ad3c7f..ff117f864d 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -420,6 +420,19 @@ message TenantProfileUpdateMsg { bytes profileDataBytes = 8; } +message ResourceUpdateMsg { + UpdateMsgType msgType = 1; + int64 idMSB = 2; + int64 idLSB = 3; + string title = 4; + string resourceType = 5; + string resourceKey = 6; + string fileName = 7; + optional string data = 8; + optional string etag = 9; + bool isSystem = 10; +} + message RuleChainMetadataRequestMsg { int64 ruleChainIdMSB = 1; int64 ruleChainIdLSB = 2; @@ -571,6 +584,7 @@ message UplinkMsg { repeated EntityViewUpdateMsg entityViewUpdateMsg = 18; repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; + repeated ResourceUpdateMsg resourceUpdateMsg = 21; } message UplinkResponseMsg { @@ -613,5 +627,6 @@ message DownlinkMsg { EdgeConfiguration edgeConfiguration = 25; repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; + repeated ResourceUpdateMsg resourceUpdateMsg = 28; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java index 7697217b6b..76ec3766d7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java @@ -21,10 +21,9 @@ import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; import org.springframework.stereotype.Service; import org.springframework.transaction.event.TransactionalEventListener; -import org.thingsboard.server.cache.device.DeviceCacheKey; +import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey; import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey; import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; @@ -36,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractCachedEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; @@ -59,10 +60,23 @@ public class BaseResourceService extends AbstractCachedEntityService findAllTenantResources(TenantId tenantId, PageLink pageLink) { + log.trace("Executing findAllTenantResources [{}][{}]", tenantId, pageLink); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + return resourceDao.findAllByTenantId(tenantId, pageLink); + } + @Override public PageData findTenantResourcesByResourceTypeAndPageLink(TenantId tenantId, ResourceType resourceType, PageLink pageLink) { log.trace("Executing findTenantResourcesByResourceTypeAndPageLink [{}][{}][{}]", tenantId, resourceType, pageLink);