Resource support for edge

This commit is contained in:
Andrii Landiak 2023-09-11 17:04:25 +03:00
parent 3117af791c
commit 43c2ae653d
22 changed files with 430 additions and 13 deletions

View File

@ -48,6 +48,7 @@ import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEd
import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantProfileEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.tenant.TenantProfileEdgeProcessor;
@ -125,6 +126,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
@Autowired @Autowired
private RelationEdgeProcessor relationProcessor; private RelationEdgeProcessor relationProcessor;
@Autowired
private ResourceEdgeProcessor resourceEdgeProcessor;
@Autowired @Autowired
protected ApplicationEventPublisher eventPublisher; protected ApplicationEventPublisher eventPublisher;
@ -215,6 +219,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
case TENANT_PROFILE: case TENANT_PROFILE:
future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break; break;
case TB_RESOURCE:
future = resourceEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg);
break;
default: default:
log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type); log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type);
future = Futures.immediateFuture(null); future = Futures.immediateFuture(null);

View File

@ -33,6 +33,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.settings.AdminSettingsService; import org.thingsboard.server.dao.settings.AdminSettingsService;
import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantProfileService;
@ -55,6 +56,7 @@ import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEd
import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.resource.ResourceEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.settings.AdminSettingsEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.settings.AdminSettingsEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdgeProcessor;
@ -139,6 +141,9 @@ public class EdgeContextComponent {
@Autowired @Autowired
private QueueService queueService; private QueueService queueService;
@Autowired
private ResourceService resourceService;
@Autowired @Autowired
private AlarmEdgeProcessor alarmProcessor; private AlarmEdgeProcessor alarmProcessor;
@ -199,6 +204,9 @@ public class EdgeContextComponent {
@Autowired @Autowired
private TenantProfileEdgeProcessor tenantProfileEdgeProcessor; private TenantProfileEdgeProcessor tenantProfileEdgeProcessor;
@Autowired
private ResourceEdgeProcessor resourceEdgeProcessor;
@Autowired @Autowired
private EdgeMsgConstructor edgeMsgConstructor; private EdgeMsgConstructor edgeMsgConstructor;

View File

@ -63,6 +63,7 @@ import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.RequestMsgType; import org.thingsboard.server.gen.edge.v1.RequestMsgType;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg; import org.thingsboard.server.gen.edge.v1.SyncCompletedMsg;
@ -645,6 +646,8 @@ public final class EdgeGrpcSession implements Closeable {
return ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent); return ctx.getAdminSettingsProcessor().convertAdminSettingsEventToDownlink(edgeEvent);
case OTA_PACKAGE: case OTA_PACKAGE:
return ctx.getOtaPackageEdgeProcessor().convertOtaPackageEventToDownlink(edgeEvent); return ctx.getOtaPackageEdgeProcessor().convertOtaPackageEventToDownlink(edgeEvent);
case TB_RESOURCE:
return ctx.getResourceEdgeProcessor().convertResourceEventToDownlink(edgeEvent);
case QUEUE: case QUEUE:
return ctx.getQueueEdgeProcessor().convertQueueEventToDownlink(edgeEvent); return ctx.getQueueEdgeProcessor().convertQueueEventToDownlink(edgeEvent);
case TENANT: case TENANT:
@ -710,6 +713,11 @@ public final class EdgeGrpcSession implements Closeable {
result.add(ctx.getDashboardProcessor().processDashboardMsgFromEdge(edge.getTenantId(), edge, dashboardUpdateMsg)); result.add(ctx.getDashboardProcessor().processDashboardMsgFromEdge(edge.getTenantId(), edge, dashboardUpdateMsg));
} }
} }
if (uplinkMsg.getResourceUpdateMsgCount() > 0) {
for (ResourceUpdateMsg resourceUpdateMsg : uplinkMsg.getResourceUpdateMsgList()) {
result.add(ctx.getResourceEdgeProcessor().processResourceMsgFromEdge(edge.getTenantId(), resourceUpdateMsg));
}
}
if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg)); result.add(ctx.getEdgeRequestsService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg));

View File

@ -33,10 +33,12 @@ import org.thingsboard.server.service.edge.rpc.fetch.EntityViewsEdgeEventFetcher
import org.thingsboard.server.service.edge.rpc.fetch.OtaPackagesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.OtaPackagesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.QueuesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.QueuesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.SystemResourcesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetTypesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetTypesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantResourcesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetTypesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetTypesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher;
@ -77,6 +79,8 @@ public class EdgeSyncCursor {
fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
fetchers.add(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); fetchers.add(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
fetchers.add(new OtaPackagesEdgeEventFetcher(ctx.getOtaPackageService())); fetchers.add(new OtaPackagesEdgeEventFetcher(ctx.getOtaPackageService()));
fetchers.add(new SystemResourcesEdgeEventFetcher(ctx.getResourceService()));
fetchers.add(new TenantResourcesEdgeEventFetcher(ctx.getResourceService()));
} }
} }

View File

@ -0,0 +1,57 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.constructor;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
@Component
@TbCoreComponent
public class ResourceMsgConstructor {
public ResourceUpdateMsg constructResourceUpdatedMsg(UpdateMsgType msgType, TbResource tbResource) {
ResourceUpdateMsg.Builder builder = ResourceUpdateMsg.newBuilder()
.setMsgType(msgType)
.setIdMSB(tbResource.getId().getId().getMostSignificantBits())
.setIdLSB(tbResource.getId().getId().getLeastSignificantBits())
.setTitle(tbResource.getTitle())
.setResourceKey(tbResource.getResourceKey())
.setResourceType(tbResource.getResourceType().name())
.setFileName(tbResource.getFileName());
if (tbResource.getData() != null) {
builder.setData(tbResource.getData());
}
if (tbResource.getEtag() != null) {
builder.setEtag(tbResource.getEtag());
}
if (tbResource.getTenantId().equals(TenantId.SYS_TENANT_ID)) {
builder.setIsSystem(true);
}
return builder.build();
}
public ResourceUpdateMsg constructResourceDeleteMsg(TbResourceId tbResourceId) {
return ResourceUpdateMsg.newBuilder()
.setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
.setIdMSB(tbResourceId.getId().getMostSignificantBits())
.setIdLSB(tbResourceId.getId().getLeastSignificantBits()).build();
}
}

View File

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.resource.ResourceService;
@Slf4j
@AllArgsConstructor
public abstract class BaseResourceEdgeEventFetcher extends BasePageableEdgeEventFetcher<TbResource> {
protected final ResourceService resourceService;
@Override
PageData<TbResource> fetchPageData(TenantId tenantId, Edge edge, PageLink pageLink) {
return findTenantResources(tenantId, pageLink);
}
@Override
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, TbResource tbResource) {
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.TB_RESOURCE,
EdgeEventActionType.ADDED, tbResource.getId(), null);
}
protected abstract PageData<TbResource> findTenantResources(TenantId tenantId, PageLink pageLink);
}

View File

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.resource.ResourceService;
public class SystemResourcesEdgeEventFetcher extends BaseResourceEdgeEventFetcher {
public SystemResourcesEdgeEventFetcher(ResourceService resourceService) {
super(resourceService);
}
@Override
protected PageData<TbResource> findTenantResources(TenantId tenantId, PageLink pageLink) {
return resourceService.findAllTenantResources(TenantId.SYS_TENANT_ID, pageLink);
}
}

View File

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.resource.ResourceService;
@Slf4j
public class TenantResourcesEdgeEventFetcher extends BaseResourceEdgeEventFetcher {
public TenantResourcesEdgeEventFetcher(ResourceService resourceService) {
super(resourceService);
}
@Override
protected PageData<TbResource> findTenantResources(TenantId tenantId, PageLink pageLink) {
return resourceService.findAllTenantResources(tenantId, pageLink);
}
}

View File

@ -28,6 +28,7 @@ public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdge
public TenantWidgetsBundlesEdgeEventFetcher(WidgetsBundleService widgetsBundleService) { public TenantWidgetsBundlesEdgeEventFetcher(WidgetsBundleService widgetsBundleService) {
super(widgetsBundleService); super(widgetsBundleService);
} }
@Override @Override
protected PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink) { protected PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink) {
return widgetsBundleService.findTenantWidgetsBundlesByTenantId(tenantId, pageLink); return widgetsBundleService.findTenantWidgetsBundlesByTenantId(tenantId, pageLink);

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
@ -72,6 +73,7 @@ import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.ota.OtaPackageService; import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantProfileService;
@ -101,6 +103,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.OtaPackageMsgConstruc
import org.thingsboard.server.service.edge.rpc.constructor.QueueMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.QueueMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.ResourceMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.TenantMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.TenantMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.TenantProfileMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.TenantProfileMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
@ -211,6 +214,9 @@ public abstract class BaseEdgeProcessor {
@Autowired @Autowired
protected PartitionService partitionService; protected PartitionService partitionService;
@Autowired
protected ResourceService resourceService;
@Autowired @Autowired
@Lazy @Lazy
protected TbQueueProducerProvider producerProvider; protected TbQueueProducerProvider producerProvider;
@ -233,6 +239,9 @@ public abstract class BaseEdgeProcessor {
@Autowired @Autowired
protected DataValidator<EntityView> entityViewValidator; protected DataValidator<EntityView> entityViewValidator;
@Autowired
protected DataValidator<TbResource> resourceValidator;
@Autowired @Autowired
protected EdgeMsgConstructor edgeMsgConstructor; protected EdgeMsgConstructor edgeMsgConstructor;
@ -293,6 +302,9 @@ public abstract class BaseEdgeProcessor {
@Autowired @Autowired
protected QueueMsgConstructor queueMsgConstructor; protected QueueMsgConstructor queueMsgConstructor;
@Autowired
protected ResourceMsgConstructor resourceMsgConstructor;
@Autowired @Autowired
protected EdgeSynchronizationManager edgeSynchronizationManager; protected EdgeSynchronizationManager edgeSynchronizationManager;

View File

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.resource;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
@Slf4j
public abstract class BaseResourceProcessor extends BaseEdgeProcessor {
protected void saveOrUpdateTbResource(TenantId tenantId, TbResourceId tbResourceId, ResourceUpdateMsg resourceUpdateMsg) {
try {
boolean created = false;
TbResource resource = resourceService.findResourceById(tenantId, tbResourceId);
if (resource == null) {
resource = new TbResource();
if (resourceUpdateMsg.getIsSystem()) {
resource.setTenantId(TenantId.SYS_TENANT_ID);
} else {
resource.setTenantId(tenantId);
}
resource.setCreatedTime(Uuids.unixTimestamp(tbResourceId.getId()));
created = true;
}
resource.setTitle(resourceUpdateMsg.getTitle());
resource.setResourceKey(resourceUpdateMsg.getResourceKey());
resource.setResourceType(ResourceType.valueOf(resourceUpdateMsg.getResourceType()));
resource.setFileName(resourceUpdateMsg.getFileName());
resource.setData(resourceUpdateMsg.hasData() ? resourceUpdateMsg.getData() : null);
resource.setEtag(resourceUpdateMsg.hasEtag() ? resourceUpdateMsg.getEtag() : null);
resourceValidator.validate(resource, TbResourceInfo::getTenantId);
if (created) {
resource.setId(tbResourceId);
}
resourceService.saveResource(resource, false);
} catch (Exception e) {
log.error("[{}] Failed to process resource update msg [{}]", tenantId, resourceUpdateMsg, e);
throw e;
}
}
}

View File

@ -0,0 +1,100 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.resource;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.ResourceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.UUID;
@Component
@Slf4j
@TbCoreComponent
public class ResourceEdgeProcessor extends BaseResourceProcessor {
public ListenableFuture<Void> processResourceMsgFromEdge(TenantId tenantId, ResourceUpdateMsg resourceUpdateMsg) {
TbResourceId tbResourceId = new TbResourceId(new UUID(resourceUpdateMsg.getIdMSB(), resourceUpdateMsg.getIdLSB()));
try {
edgeSynchronizationManager.getSync().set(true);
switch (resourceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
super.saveOrUpdateTbResource(tenantId, tbResourceId, resourceUpdateMsg);
break;
case ENTITY_DELETED_RPC_MESSAGE:
TbResource tbResourceToDelete = resourceService.findResourceById(tenantId, tbResourceId);
if (tbResourceToDelete != null) {
resourceService.deleteResource(tenantId, tbResourceId);
}
break;
case UNRECOGNIZED:
return handleUnsupportedMsgType(resourceUpdateMsg.getMsgType());
}
} catch (DataValidationException e) {
if (e.getMessage().contains("files size limit is exhausted")) {
log.warn("[{}] Resource data size has been exhausted {}", tenantId, resourceUpdateMsg, e);
return Futures.immediateFuture(null);
} else {
return Futures.immediateFailedFuture(e);
}
} finally {
edgeSynchronizationManager.getSync().remove();
}
return Futures.immediateFuture(null);
}
public DownlinkMsg convertResourceEventToDownlink(EdgeEvent edgeEvent) {
TbResourceId tbResourceId = new TbResourceId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEvent.getAction()) {
case ADDED:
case UPDATED:
TbResource tbResource = resourceService.findResourceById(edgeEvent.getTenantId(), tbResourceId);
if (tbResource != null) {
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
ResourceUpdateMsg resourceUpdateMsg =
resourceMsgConstructor.constructResourceUpdatedMsg(msgType, tbResource);
downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addResourceUpdateMsg(resourceUpdateMsg)
.build();
}
break;
case DELETED:
ResourceUpdateMsg resourceUpdateMsg =
resourceMsgConstructor.constructResourceDeleteMsg(tbResourceId);
downlinkMsg = DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addResourceUpdateMsg(resourceUpdateMsg)
.build();
break;
}
return downlinkMsg;
}
}

View File

@ -15,7 +15,6 @@
*/ */
package org.thingsboard.server.edge; package org.thingsboard.server.edge;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
@ -393,7 +392,7 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
Assert.assertEquals(expectedRuleChainUUID, ruleChainUUID); Assert.assertEquals(expectedRuleChainUUID, ruleChainUUID);
} }
private void validateAdminSettings() throws JsonProcessingException { private void validateAdminSettings() {
List<AdminSettingsUpdateMsg> adminSettingsUpdateMsgs = edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class); List<AdminSettingsUpdateMsg> adminSettingsUpdateMsgs = edgeImitator.findAllMessagesByType(AdminSettingsUpdateMsg.class);
Assert.assertEquals(4, adminSettingsUpdateMsgs.size()); Assert.assertEquals(4, adminSettingsUpdateMsgs.size());

View File

@ -40,5 +40,4 @@ class AssetEdgeProcessorTest extends AbstractAssetProcessorTest {
verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB);
} }
} }

View File

@ -39,5 +39,4 @@ class DeviceEdgeProcessorTest extends AbstractDeviceProcessorTest {
verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB);
} }
} }

View File

@ -41,5 +41,4 @@ class DeviceProfileEdgeProcessorTest extends AbstractDeviceProcessorTest {
verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB); verify(downlinkMsg, expectedDashboardIdMSB, expectedDashboardIdLSB, expectedRuleChainIdMSB, expectedRuleChainIdLSB);
} }
} }

View File

@ -32,12 +32,16 @@ public interface ResourceService extends EntityDaoService {
TbResource saveResource(TbResource resource); TbResource saveResource(TbResource resource);
TbResource saveResource(TbResource resource, boolean doValidate);
TbResource getResource(TenantId tenantId, ResourceType resourceType, String resourceId); TbResource getResource(TenantId tenantId, ResourceType resourceType, String resourceId);
TbResource findResourceById(TenantId tenantId, TbResourceId resourceId); TbResource findResourceById(TenantId tenantId, TbResourceId resourceId);
TbResourceInfo findResourceInfoById(TenantId tenantId, TbResourceId resourceId); TbResourceInfo findResourceInfoById(TenantId tenantId, TbResourceId resourceId);
PageData<TbResource> findAllTenantResources(TenantId tenantId, PageLink pageLink);
ListenableFuture<TbResourceInfo> findResourceInfoByIdAsync(TenantId tenantId, TbResourceId resourceId); ListenableFuture<TbResourceInfo> findResourceInfoByIdAsync(TenantId tenantId, TbResourceId resourceId);
PageData<TbResourceInfo> findAllTenantResourcesByTenantId(TbResourceInfoFilter filter, PageLink pageLink); PageData<TbResourceInfo> findAllTenantResourcesByTenantId(TbResourceInfoFilter filter, PageLink pageLink);

View File

@ -39,7 +39,8 @@ public enum EdgeEventType {
WIDGET_TYPE(true, EntityType.WIDGET_TYPE), WIDGET_TYPE(true, EntityType.WIDGET_TYPE),
ADMIN_SETTINGS(true, null), ADMIN_SETTINGS(true, null),
OTA_PACKAGE(true, EntityType.OTA_PACKAGE), OTA_PACKAGE(true, EntityType.OTA_PACKAGE),
QUEUE(true, EntityType.QUEUE); QUEUE(true, EntityType.QUEUE),
TB_RESOURCE(true, EntityType.TB_RESOURCE);
private final boolean allEdgesRelated; private final boolean allEdgesRelated;

View File

@ -139,6 +139,8 @@ public class EntityIdFactory {
return new EdgeId(uuid); return new EdgeId(uuid);
case QUEUE: case QUEUE:
return new QueueId(uuid); return new QueueId(uuid);
case TB_RESOURCE:
return new TbResourceId(uuid);
} }
throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!");
} }

View File

@ -420,6 +420,19 @@ message TenantProfileUpdateMsg {
bytes profileDataBytes = 8; bytes profileDataBytes = 8;
} }
message ResourceUpdateMsg {
UpdateMsgType msgType = 1;
int64 idMSB = 2;
int64 idLSB = 3;
string title = 4;
string resourceType = 5;
string resourceKey = 6;
string fileName = 7;
optional string data = 8;
optional string etag = 9;
bool isSystem = 10;
}
message RuleChainMetadataRequestMsg { message RuleChainMetadataRequestMsg {
int64 ruleChainIdMSB = 1; int64 ruleChainIdMSB = 1;
int64 ruleChainIdLSB = 2; int64 ruleChainIdLSB = 2;
@ -571,6 +584,7 @@ message UplinkMsg {
repeated EntityViewUpdateMsg entityViewUpdateMsg = 18; repeated EntityViewUpdateMsg entityViewUpdateMsg = 18;
repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19;
repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20;
repeated ResourceUpdateMsg resourceUpdateMsg = 21;
} }
message UplinkResponseMsg { message UplinkResponseMsg {
@ -613,5 +627,6 @@ message DownlinkMsg {
EdgeConfiguration edgeConfiguration = 25; EdgeConfiguration edgeConfiguration = 25;
repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantUpdateMsg tenantUpdateMsg = 26;
repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27;
repeated ResourceUpdateMsg resourceUpdateMsg = 28;
} }

View File

@ -21,10 +21,9 @@ import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException; import org.hibernate.exception.ConstraintViolationException;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.event.TransactionalEventListener;
import org.thingsboard.server.cache.device.DeviceCacheKey; import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey;
import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent; import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey;
import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.TbResourceInfo;
@ -36,6 +35,8 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.service.PaginatedRemover;
@ -59,10 +60,23 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
@Override @Override
public TbResource saveResource(TbResource resource) { public TbResource saveResource(TbResource resource) {
return doSaveResource(resource, true);
}
@Override
public TbResource saveResource(TbResource resource, boolean doValidate) {
return doSaveResource(resource, doValidate);
}
private TbResource doSaveResource(TbResource resource, boolean doValidate) {
if (doValidate) {
resourceValidator.validate(resource, TbResourceInfo::getTenantId); resourceValidator.validate(resource, TbResourceInfo::getTenantId);
}
try { try {
TbResource saved = resourceDao.save(resource.getTenantId(), resource); TbResource saved = resourceDao.save(resource.getTenantId(), resource);
publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId())); publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId()));
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(saved.getTenantId())
.entityId(saved.getId()).added(saved.getId() == null).build());
return saved; return saved;
} catch (Exception t) { } catch (Exception t) {
publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId())); publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId()));
@ -111,6 +125,7 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
Validator.validateId(resourceId, INCORRECT_RESOURCE_ID + resourceId); Validator.validateId(resourceId, INCORRECT_RESOURCE_ID + resourceId);
resourceValidator.validateDelete(tenantId, resourceId); resourceValidator.validateDelete(tenantId, resourceId);
resourceDao.removeById(tenantId, resourceId.getId()); resourceDao.removeById(tenantId, resourceId.getId());
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(resourceId).build());
} }
@Override @Override
@ -136,6 +151,13 @@ public class BaseResourceService extends AbstractCachedEntityService<ResourceInf
return resourceDao.findResourcesByTenantIdAndResourceType(tenantId, resourceType, objectIds, null); return resourceDao.findResourcesByTenantIdAndResourceType(tenantId, resourceType, objectIds, null);
} }
@Override
public PageData<TbResource> findAllTenantResources(TenantId tenantId, PageLink pageLink) {
log.trace("Executing findAllTenantResources [{}][{}]", tenantId, pageLink);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
return resourceDao.findAllByTenantId(tenantId, pageLink);
}
@Override @Override
public PageData<TbResource> findTenantResourcesByResourceTypeAndPageLink(TenantId tenantId, ResourceType resourceType, PageLink pageLink) { public PageData<TbResource> findTenantResourcesByResourceTypeAndPageLink(TenantId tenantId, ResourceType resourceType, PageLink pageLink) {
log.trace("Executing findTenantResourcesByResourceTypeAndPageLink [{}][{}][{}]", tenantId, resourceType, pageLink); log.trace("Executing findTenantResourcesByResourceTypeAndPageLink [{}][{}][{}]", tenantId, resourceType, pageLink);