diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 6d368fbed8..4672900472 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -41,22 +41,22 @@ import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.AssetProfileEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.QueueEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.RuleChainEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.UserEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.asset.AssetProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.customer.CustomerEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.device.DeviceProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.edge.EdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.user.UserEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetBundleEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetTypeEdgeProcessor; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 4adb73ebbb..d88fc67e91 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -40,24 +40,24 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings; import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor; -import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.AssetProfileEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.QueueEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.RuleChainEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.TelemetryEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.UserEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleEdgeProcessor; -import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.settings.AdminSettingsEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.asset.AssetProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.customer.CustomerEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.device.DeviceProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.edge.EdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.user.UserEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetBundleEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetTypeEdgeProcessor; import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.GrpcCallbackExecutorService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 38ba0c0794..db19461aa4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -457,7 +457,6 @@ public final class EdgeGrpcSession implements Closeable { case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: case CREDENTIALS_REQUEST: - case ENTITY_MERGE_REQUEST: case RPC_CALL: downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg); @@ -556,27 +555,27 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryMsg(edge.getTenantId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceMsgFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); } } if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceCredentialsFromEdge(edge.getTenantId(), deviceCredentialsUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg)); } } if (uplinkMsg.getAlarmUpdateMsgCount() > 0) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { - result.add(ctx.getAlarmProcessor().processAlarmFromEdge(edge.getTenantId(), alarmUpdateMsg)); + result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg)); } } if (uplinkMsg.getRelationUpdateMsgCount() > 0) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { - result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg)); + result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg)); } } if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java index 5af2a1e737..c21e123970 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceMsgConstructor.java @@ -41,7 +41,7 @@ public class DeviceMsgConstructor { @Autowired private DataDecodingEncodingService dataDecodingEncodingService; - public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, String conflictName) { + public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) { DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() .setMsgType(msgType) .setIdMSB(device.getId().getId().getMostSignificantBits()) @@ -70,9 +70,6 @@ public class DeviceMsgConstructor { builder.setSoftwareIdMSB(device.getSoftwareId().getId().getMostSignificantBits()) .setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits()); } - if (conflictName != null) { - builder.setConflictName(conflictName); - } if (device.getDeviceData() != null) { builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData()))); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java deleted file mode 100644 index 5f4395fb47..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmEdgeProcessor.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.edge.rpc.processor; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.EdgeUtils; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.alarm.AlarmSeverity; -import org.thingsboard.server.common.data.alarm.AlarmStatus; -import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.AlarmId; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; -import org.thingsboard.server.gen.edge.v1.DownlinkMsg; -import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.TbCoreComponent; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -@Component -@Slf4j -@TbCoreComponent -public class AlarmEdgeProcessor extends BaseEdgeProcessor { - - public ListenableFuture processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { - log.trace("[{}] processAlarmFromEdge [{}]", tenantId, alarmUpdateMsg); - EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), - EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); - if (originatorId == null) { - log.warn("Originator not found for the alarm msg {}", alarmUpdateMsg); - return Futures.immediateFuture(null); - } - try { - Alarm existentAlarm = alarmService.findLatestByOriginatorAndType(tenantId, originatorId, alarmUpdateMsg.getType()).get(); - switch (alarmUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - case ENTITY_UPDATED_RPC_MESSAGE: - if (existentAlarm == null || existentAlarm.getStatus().isCleared()) { - existentAlarm = new Alarm(); - existentAlarm.setTenantId(tenantId); - existentAlarm.setType(alarmUpdateMsg.getName()); - existentAlarm.setOriginator(originatorId); - existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity())); - existentAlarm.setStartTs(alarmUpdateMsg.getStartTs()); - existentAlarm.setClearTs(alarmUpdateMsg.getClearTs()); - existentAlarm.setPropagate(alarmUpdateMsg.getPropagate()); - } - existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus())); - existentAlarm.setAckTs(alarmUpdateMsg.getAckTs()); - existentAlarm.setEndTs(alarmUpdateMsg.getEndTs()); - existentAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails())); - alarmService.createOrUpdateAlarm(existentAlarm); - break; - case ALARM_ACK_RPC_MESSAGE: - if (existentAlarm != null) { - alarmService.ackAlarm(tenantId, existentAlarm.getId(), alarmUpdateMsg.getAckTs()); - } - break; - case ALARM_CLEAR_RPC_MESSAGE: - if (existentAlarm != null) { - alarmService.clearAlarm(tenantId, existentAlarm.getId(), - JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs()); - } - break; - case ENTITY_DELETED_RPC_MESSAGE: - if (existentAlarm != null) { - alarmService.deleteAlarm(tenantId, existentAlarm.getId()); - } - break; - } - return Futures.immediateFuture(null); - } catch (Exception e) { - log.error("Failed to process alarm update msg [{}]", alarmUpdateMsg, e); - return Futures.immediateFailedFuture(new RuntimeException("Failed to process alarm update msg", e)); - } - } - - private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) { - switch (entityType) { - case DEVICE: - return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId(); - case ASSET: - return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId(); - case ENTITY_VIEW: - return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId(); - default: - return null; - } - } - - public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) { - AlarmId alarmId = new AlarmId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); - switch (edgeEvent.getAction()) { - case ADDED: - case UPDATED: - case ALARM_ACK: - case ALARM_CLEAR: - try { - Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get(); - if (alarm != null) { - downlinkMsg = DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addAlarmUpdateMsg(alarmMsgConstructor.constructAlarmUpdatedMsg(edgeEvent.getTenantId(), msgType, alarm)) - .build(); - } - } catch (Exception e) { - log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e); - } - break; - case DELETED: - Alarm alarm = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), Alarm.class); - AlarmUpdateMsg alarmUpdateMsg = - alarmMsgConstructor.constructAlarmUpdatedMsg(edgeEvent.getTenantId(), msgType, alarm); - downlinkMsg = DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addAlarmUpdateMsg(alarmUpdateMsg) - .build(); - break; - } - return downlinkMsg; - } - - public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { - EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); - AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); - switch (actionType) { - case DELETED: - EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); - Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); - return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm)); - default: - ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - return Futures.transformAsync(alarmFuture, alarm -> { - if (alarm == null) { - return Futures.immediateFuture(null); - } - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type == null) { - return Futures.immediateFuture(null); - } - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - alarmId, - null)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - }, dbCallbackExecutorService); - } - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 9776cf3a4d..4b94cee592 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -97,10 +97,14 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j public abstract class BaseEdgeProcessor { + protected static final Lock deviceCreationLock = new ReentrantLock(); + protected static final int DEFAULT_PAGE_SIZE = 100; @Autowired @@ -468,4 +472,17 @@ public abstract class BaseEdgeProcessor { return null; } } + + protected UUID safeGetUUID(long mSB, long lSB) { + return mSB != 0 && lSB != 0 ? new UUID(mSB, lSB) : null; + } + + protected CustomerId safeGetCustomerId(long mSB, long lSB) { + CustomerId customerId = null; + UUID customerUUID = safeGetUUID(mSB, lSB); + if (customerUUID != null) { + customerId = new CustomerId(customerUUID); + } + return customerId; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java new file mode 100644 index 0000000000..e37e90265b --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -0,0 +1,102 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.alarm; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.AlarmId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Component +@Slf4j +@TbCoreComponent +public class AlarmEdgeProcessor extends BaseAlarmProcessor { + + public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) { + AlarmUpdateMsg alarmUpdateMsg = + convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody()); + if (alarmUpdateMsg != null) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmUpdateMsg(alarmUpdateMsg) + .build(); + } + return null; + } + + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + switch (actionType) { + case DELETED: + EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); + Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class); + return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm)); + default: + ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); + } + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); + } + PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); + PageData pageData; + List> futures = new ArrayList<>(); + do { + pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + for (EdgeId relatedEdgeId : pageData.getData()) { + futures.add(saveEdgeEvent(tenantId, + relatedEdgeId, + EdgeEventType.ALARM, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + alarmId, + null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + }, dbCallbackExecutorService); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java new file mode 100644 index 0000000000..203f301c92 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -0,0 +1,127 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.alarm; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmSeverity; +import org.thingsboard.server.common.data.alarm.AlarmStatus; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.AlarmId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +import java.util.UUID; + +@Slf4j +public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { + + public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { + log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); + EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), + EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); + if (originatorId == null) { + log.warn("Originator not found for the alarm msg {}", alarmUpdateMsg); + return Futures.immediateFuture(null); + } + try { + Alarm existentAlarm = alarmService.findLatestByOriginatorAndType(tenantId, originatorId, alarmUpdateMsg.getType()).get(); + switch (alarmUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + if (existentAlarm == null || existentAlarm.getStatus().isCleared()) { + existentAlarm = new Alarm(); + existentAlarm.setTenantId(tenantId); + existentAlarm.setType(alarmUpdateMsg.getName()); + existentAlarm.setOriginator(originatorId); + existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity())); + existentAlarm.setStartTs(alarmUpdateMsg.getStartTs()); + existentAlarm.setClearTs(alarmUpdateMsg.getClearTs()); + existentAlarm.setPropagate(alarmUpdateMsg.getPropagate()); + } + existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus())); + existentAlarm.setAckTs(alarmUpdateMsg.getAckTs()); + existentAlarm.setEndTs(alarmUpdateMsg.getEndTs()); + existentAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails())); + alarmService.createOrUpdateAlarm(existentAlarm); + break; + case ALARM_ACK_RPC_MESSAGE: + if (existentAlarm != null) { + alarmService.ackAlarm(tenantId, existentAlarm.getId(), alarmUpdateMsg.getAckTs()); + } + break; + case ALARM_CLEAR_RPC_MESSAGE: + if (existentAlarm != null) { + alarmService.clearAlarm(tenantId, existentAlarm.getId(), + JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs()); + } + break; + case ENTITY_DELETED_RPC_MESSAGE: + if (existentAlarm != null) { + alarmService.deleteAlarm(tenantId, existentAlarm.getId()); + } + break; + } + return Futures.immediateFuture(null); + } catch (Exception e) { + log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + } + + private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) { + switch (entityType) { + case DEVICE: + return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId(); + case ASSET: + return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId(); + case ENTITY_VIEW: + return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId(); + default: + return null; + } + } + + public AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) { + AlarmId alarmId = new AlarmId(entityId); + UpdateMsgType msgType = getUpdateMsgType(actionType); + switch (actionType) { + case ADDED: + case UPDATED: + case ALARM_ACK: + case ALARM_CLEAR: + Alarm alarm = alarmService.findAlarmById(tenantId, alarmId); + if (alarm != null) { + return alarmMsgConstructor.constructAlarmUpdatedMsg(tenantId, msgType, alarm); + } + break; + case DELETED: + Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class); + return alarmMsgConstructor.constructAlarmUpdatedMsg(tenantId, msgType, deletedAlarm); + } + return null; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java similarity index 96% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java index 8f03523034..0404445b65 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.asset; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -30,6 +30,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProfileEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java index ce17c050b3..3b3c13ac4a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/asset/AssetProfileEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.asset; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java similarity index 97% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java index 7d720fa671..e3d62181cc 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/customer/CustomerEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.customer; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -36,6 +36,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import java.util.ArrayList; import java.util.List; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index 283df6954e..2b73496514 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.dashboard; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java new file mode 100644 index 0000000000..0ba11e36af --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java @@ -0,0 +1,134 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.device; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.OtaPackageId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; +import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; +import org.thingsboard.server.queue.util.DataDecodingEncodingService; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +import java.util.Optional; +import java.util.UUID; + +@Slf4j +public abstract class BaseDeviceProcessor extends BaseEdgeProcessor { + + @Autowired + protected DataDecodingEncodingService dataDecodingEncodingService; + + protected Pair saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, CustomerId customerId) { + boolean created = false; + boolean deviceNameUpdated = false; + deviceCreationLock.lock(); + try { + Device device = deviceService.findDeviceById(tenantId, deviceId); + String deviceName = deviceUpdateMsg.getName(); + if (device == null) { + created = true; + device = new Device(); + device.setTenantId(tenantId); + device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId())); + Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); + if (deviceByName != null) { + deviceName = deviceName + "_" + StringUtils.randomAlphabetic(15); + log.warn("Device with name {} already exists. Renaming device name to {}", + deviceUpdateMsg.getName(), deviceName); + deviceNameUpdated = true; + } + } + device.setName(deviceName); + device.setType(deviceUpdateMsg.getType()); + device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null); + device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo() + ? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null); + + UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB()); + device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null); + + device.setCustomerId(customerId); + + Optional deviceDataOpt = + dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); + device.setDeviceData(deviceDataOpt.orElse(null)); + + UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB()); + device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null); + + UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB()); + device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null); + deviceValidator.validate(device, Device::getTenantId); + if (created) { + device.setId(deviceId); + } + Device savedDevice = deviceService.saveDevice(device, false); + if (created) { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20)); + deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials); + } + tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false); + } finally { + deviceCreationLock.unlock(); + } + return Pair.of(created, deviceNameUpdated); + } + + public ListenableFuture processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { + log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); + DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); + ListenableFuture deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId); + return Futures.transform(deviceFuture, device -> { + if (device != null) { + log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", + device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); + try { + DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId()); + deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); + deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); + deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue() + ? deviceCredentialsUpdateMsg.getCredentialsValue() : null); + deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); + } catch (Exception e) { + log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", + device.getName(), deviceCredentialsUpdateMsg, e); + throw new RuntimeException(e); + } + } else { + log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg); + } + return null; + }, dbCallbackExecutorService); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java similarity index 51% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index 963f26212a..11f6d3f033 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -13,16 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.device; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DataConstants; @@ -30,24 +29,19 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -63,215 +57,57 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; -import java.util.Optional; import java.util.UUID; -import java.util.concurrent.locks.ReentrantLock; @Component @Slf4j @TbCoreComponent -public class DeviceEdgeProcessor extends BaseEdgeProcessor { +public class DeviceEdgeProcessor extends BaseDeviceProcessor { - @Autowired - private DataDecodingEncodingService dataDecodingEncodingService; - - private static final ReentrantLock deviceCreationLock = new ReentrantLock(); - - public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); - switch (deviceUpdateMsg.getMsgType()) { - case ENTITY_CREATED_RPC_MESSAGE: - String deviceName = deviceUpdateMsg.getName(); - Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); - if (device != null) { - boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device); - if (deviceAlreadyExistsForThisEdge) { - log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + - "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); - return updateDevice(tenantId, edge, deviceUpdateMsg); - } else { - log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + - "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); - String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15); - Device newDevice; - try { - newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName); - } catch (DataValidationException e) { - log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); - return Futures.immediateFuture(null); - } - ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode(); - body.put("conflictName", deviceName); - ListenableFuture input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); - return Futures.transformAsync(input, unused -> - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null), - dbCallbackExecutorService); - } - } else { - log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg); - try { - device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); - } catch (DataValidationException e) { - log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e); - return Futures.immediateFuture(null); - } - return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null); - } - case ENTITY_UPDATED_RPC_MESSAGE: - return updateDevice(tenantId, edge, deviceUpdateMsg); - case ENTITY_DELETED_RPC_MESSAGE: - DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); - Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); - if (deviceToDelete != null) { - deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); - } - return Futures.immediateFuture(null); - case UNRECOGNIZED: - default: - return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); - } - } - - private boolean isDeviceAlreadyExistsOnCloudForThisEdge(TenantId tenantId, Edge edge, Device device) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, device.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - if (pageData.getData().contains(edge.getId())) { - return true; - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - return false; - } - - public ListenableFuture processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { - log.debug("[{}] Executing processDeviceCredentialsFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); - DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); - ListenableFuture deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId); - return Futures.transform(deviceFuture, device -> { - if (device != null) { - log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]", - device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue()); - try { - DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId()); - deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType())); - deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId()); - if (deviceCredentialsUpdateMsg.hasCredentialsValue()) { - deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.getCredentialsValue()); - } - deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials); - } catch (Exception e) { - log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", device.getName(), deviceCredentialsUpdateMsg, e); - throw new RuntimeException(e); - } - } - return null; - }, dbCallbackExecutorService); - } - - - private ListenableFuture updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { + public ListenableFuture processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { + log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); - Device device = deviceService.findDeviceById(tenantId, deviceId); - if (device != null) { - device.setName(deviceUpdateMsg.getName()); - device.setType(deviceUpdateMsg.getType()); - if (deviceUpdateMsg.hasLabel()) { - device.setLabel(deviceUpdateMsg.getLabel()); - } - if (deviceUpdateMsg.hasAdditionalInfo()) { - device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo())); - } - if (deviceUpdateMsg.hasDeviceProfileIdMSB() && deviceUpdateMsg.hasDeviceProfileIdLSB()) { - DeviceProfileId deviceProfileId = new DeviceProfileId( - new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(), - deviceUpdateMsg.getDeviceProfileIdLSB())); - device.setDeviceProfileId(deviceProfileId); - } - device.setCustomerId(getCustomerId(deviceUpdateMsg)); - Optional deviceDataOpt = - dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - deviceDataOpt.ifPresent(device::setDeviceData); - Device savedDevice = deviceService.saveDevice(device); - tbClusterService.onDeviceUpdated(savedDevice, device, false); - return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); - } else { - String errMsg = String.format("[%s] can't find device [%s], edge [%s]", tenantId, deviceUpdateMsg, edge.getId()); - log.warn(errMsg); - return Futures.immediateFailedFuture(new RuntimeException(errMsg)); - } - } - - private Device createDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg, String deviceName) { - Device device; - deviceCreationLock.lock(); try { - log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); - DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); - device = deviceService.findDeviceById(tenantId, deviceId); - boolean created = false; - if (device == null) { - device = new Device(); - device.setTenantId(tenantId); - device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId())); - created = true; + switch (deviceUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge); + return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null); + case ENTITY_DELETED_RPC_MESSAGE: + Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId); + if (deviceToDelete != null) { + deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId()); + } + return Futures.immediateFuture(null); + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType()); } - device.setName(deviceName); - device.setType(deviceUpdateMsg.getType()); - if (deviceUpdateMsg.hasLabel()) { - device.setLabel(deviceUpdateMsg.getLabel()); + } catch (DataValidationException e) { + if (e.getMessage().contains("Can't create more then")) { + log.warn("[{}] Number of allowed devices violated {}", tenantId, deviceUpdateMsg, e); + return Futures.immediateFuture(null); + } else { + return Futures.immediateFailedFuture(e); } - if (deviceUpdateMsg.hasAdditionalInfo()) { - device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo())); - } - if (deviceUpdateMsg.hasDeviceProfileIdMSB() && deviceUpdateMsg.hasDeviceProfileIdLSB()) { - DeviceProfileId deviceProfileId = new DeviceProfileId( - new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(), - deviceUpdateMsg.getDeviceProfileIdLSB())); - device.setDeviceProfileId(deviceProfileId); - } - device.setCustomerId(getCustomerId(deviceUpdateMsg)); - Optional deviceDataOpt = - dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - if (deviceDataOpt.isPresent()) { - device.setDeviceData(deviceDataOpt.get()); - } - deviceValidator.validate(device, Device::getTenantId); - if (created) { - device.setId(deviceId); - } - Device savedDevice = deviceService.saveDevice(device, false); - tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false); - if (created) { - DeviceCredentials deviceCredentials = new DeviceCredentials(); - deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); - deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); - deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20)); - deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials); - } - createRelationFromEdge(tenantId, edge.getId(), device.getId()); - pushDeviceCreatedEventToRuleEngine(tenantId, edge, device); - deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId()); - } finally { - deviceCreationLock.unlock(); } - return device; } - private CustomerId getCustomerId(DeviceUpdateMsg deviceUpdateMsg) { - if (deviceUpdateMsg.hasCustomerIdMSB() && deviceUpdateMsg.hasCustomerIdLSB()) { - return new CustomerId(new UUID(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB())); - } else { - return null; + private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) { + CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()); + Pair resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId); + Boolean created = resultPair.getFirst(); + if (created) { + createRelationFromEdge(tenantId, edge.getId(), deviceId); + pushDeviceCreatedEventToRuleEngine(tenantId, edge, deviceId); + deviceService.assignDeviceToEdge(tenantId, deviceId, edge.getId()); + } + Boolean deviceNameUpdated = resultPair.getSecond(); + if (deviceNameUpdated) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null); } } @@ -284,9 +120,9 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { relationService.saveRelation(tenantId, relation); } - private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, Device device) { + private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) { try { - DeviceId deviceId = device.getId(); + Device device = deviceService.findDeviceById(tenantId, deviceId); ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(), getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); @@ -302,7 +138,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { } }); } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); + log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e); } } @@ -415,7 +251,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { if (device != null) { UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); DeviceUpdateMsg deviceUpdateMsg = - deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, null); + deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device); DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addDeviceUpdateMsg(deviceUpdateMsg); @@ -450,8 +286,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return convertRpcCallEventToDownlink(edgeEvent); case CREDENTIALS_REQUEST: return convertCredentialsRequestEventToDownlink(edgeEvent); - case ENTITY_MERGE_REQUEST: - return convertEntityMergeRequestEventToDownlink(edgeEvent); } return downlinkMsg; } @@ -475,21 +309,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { return builder.build(); } - public DownlinkMsg convertEntityMergeRequestEventToDownlink(EdgeEvent edgeEvent) { - DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - Device device = deviceService.findDeviceById(edgeEvent.getTenantId(), deviceId); - String conflictName = null; - if(edgeEvent.getBody() != null) { - conflictName = edgeEvent.getBody().get("conflictName").asText(); - } - DeviceUpdateMsg deviceUpdateMsg = deviceMsgConstructor - .constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, conflictName); - return DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addDeviceUpdateMsg(deviceUpdateMsg) - .build(); - } - public ListenableFuture processDeviceNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { return processEntityNotification(tenantId, edgeNotificationMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java index d5d36ee640..d187c23f49 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceProfileEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.device; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java similarity index 97% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java index d4f6998926..536e1a8194 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/edge/EdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.edge; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -35,6 +35,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeConfiguration; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import java.util.ArrayList; import java.util.List; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java index b89ce15a5d..8029ccc14a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/entityview/EntityViewEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.entityview; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/OtaPackageEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ota/OtaPackageEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/OtaPackageEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ota/OtaPackageEdgeProcessor.java index 37ae5142af..e3431f952c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/OtaPackageEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/ota/OtaPackageEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.ota; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.OtaPackageUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/QueueEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/queue/QueueEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/QueueEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/queue/QueueEdgeProcessor.java index 6a7e66a58a..f216002055 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/QueueEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/queue/QueueEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.queue; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java similarity index 54% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java index b2bec3559d..e1c937eaad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -13,21 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.relation; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; -import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; @@ -40,24 +32,15 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; -import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; -import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.UUID; -@Component @Slf4j -@TbCoreComponent -public class RelationEdgeProcessor extends BaseEdgeProcessor { +public abstract class BaseRelationProcessor extends BaseEdgeProcessor { - public ListenableFuture processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { + public ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg); try { EntityRelation entityRelation = new EntityRelation(); @@ -71,10 +54,9 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor { entityRelation.setTo(toId); entityRelation.setType(relationUpdateMsg.getType()); - if (relationUpdateMsg.hasTypeGroup()) { - entityRelation.setTypeGroup(RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup())); - } - entityRelation.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(relationUpdateMsg.getAdditionalInfo())); + entityRelation.setTypeGroup(relationUpdateMsg.hasTypeGroup() + ? RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()) : RelationTypeGroup.COMMON); + entityRelation.setAdditionalInfo(JacksonUtil.toJsonNode(relationUpdateMsg.getAdditionalInfo())); switch (relationUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE: @@ -94,12 +76,12 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor { return handleUnsupportedMsgType(relationUpdateMsg.getMsgType()); } } catch (Exception e) { - log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e); - return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e)); + log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e); + return Futures.immediateFailedFuture(e); } } - private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException { + private boolean isEntityExists(TenantId tenantId, EntityId entityId) { switch (entityId.getEntityType()) { case DEVICE: return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null; @@ -113,43 +95,10 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor { return userService.findUserById(tenantId, new UserId(entityId.getId())) != null; case DASHBOARD: return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null; + case EDGE: + return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null; default: - throw new ThingsboardException("Unsupported entity type " + entityId.getEntityType(), ThingsboardErrorCode.INVALID_ARGUMENTS); + return false; } } - - public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { - EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); - UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); - RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); - return DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addRelationUpdateMsg(relationUpdateMsg) - .build(); - } - - public ListenableFuture processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { - EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class); - if (relation.getFrom().getEntityType().equals(EntityType.EDGE) || - relation.getTo().getEntityType().equals(EntityType.EDGE)) { - return Futures.immediateFuture(null); - } - - Set uniqueEdgeIds = new HashSet<>(); - uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getTo())); - uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getFrom())); - if (uniqueEdgeIds.isEmpty()) { - return Futures.immediateFuture(null); - } - List> futures = new ArrayList<>(); - for (EdgeId edgeId : uniqueEdgeIds) { - futures.add(saveEdgeEvent(tenantId, - edgeId, - EdgeEventType.RELATION, - EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), - null, - JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); - } - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java new file mode 100644 index 0000000000..07e25e4c23 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/RelationEdgeProcessor.java @@ -0,0 +1,82 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.relation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Component +@Slf4j +@TbCoreComponent +public class RelationEdgeProcessor extends BaseRelationProcessor { + + public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) { + EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class); + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addRelationUpdateMsg(relationUpdateMsg) + .build(); + } + + public ListenableFuture processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException { + EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class); + if (relation.getFrom().getEntityType().equals(EntityType.EDGE) || + relation.getTo().getEntityType().equals(EntityType.EDGE)) { + return Futures.immediateFuture(null); + } + + Set uniqueEdgeIds = new HashSet<>(); + uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getTo())); + uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getFrom())); + if (uniqueEdgeIds.isEmpty()) { + return Futures.immediateFuture(null); + } + List> futures = new ArrayList<>(); + for (EdgeId edgeId : uniqueEdgeIds) { + futures.add(saveEdgeEvent(tenantId, + edgeId, + EdgeEventType.RELATION, + EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()), + null, + JacksonUtil.OBJECT_MAPPER.valueToTree(relation))); + } + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java similarity index 97% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java index 22d8005472..ed418eef03 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/rule/RuleChainEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.rule; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -31,6 +31,7 @@ import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import static org.thingsboard.server.service.edge.DefaultEdgeNotificationService.EDGE_IS_ROOT_BODY_KEY; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java similarity index 92% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java index c3d3df4492..a202fb39ec 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/settings/AdminSettingsEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.settings; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java similarity index 87% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 7e1a459e3a..30979187d1 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.telemetry; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -26,19 +27,16 @@ import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; -import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; @@ -60,24 +58,22 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; -import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import javax.annotation.Nullable; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; +import java.util.UUID; -@Component @Slf4j -@TbCoreComponent -public class TelemetryEdgeProcessor extends BaseEdgeProcessor { +public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { private final Gson gson = new Gson(); @@ -88,15 +84,17 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); } - public List> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) { - log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData); + abstract protected String getMsgSourceKey(); + + public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { + log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); TbMsgMetaData metaData = pair.getKey(); CustomerId customerId = pair.getValue(); - metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); + metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); if (entityData.hasPostAttributesMsg()) { result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); } @@ -283,11 +281,11 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { String entityType) { SettableFuture futureToSet = SettableFuture.create(); String scope = attributeDeleteMsg.getScope(); - List attributeNames = attributeDeleteMsg.getAttributeNamesList(); - attributesService.removeAll(tenantId, entityId, scope, attributeNames); + List attributeKeys = attributeDeleteMsg.getAttributeNamesList(); + attributesService.removeAll(tenantId, entityId, scope, attributeKeys); if (EntityType.DEVICE.name().equals(entityType)) { tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() { + tenantId, (DeviceId) entityId, scope, attributeKeys), new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { futureToSet.set(null); @@ -303,47 +301,42 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return futureToSet; } - public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException { + public EntityDataProto convertTelemetryEventToEntityDataProto(EntityType entityType, + UUID entityUUID, + EdgeEventActionType actionType, + JsonNode body) throws JsonProcessingException { EntityId entityId; - switch (edgeEvent.getType()) { + switch (entityType) { case DEVICE: - entityId = new DeviceId(edgeEvent.getEntityId()); + entityId = new DeviceId(entityUUID); break; case ASSET: - entityId = new AssetId(edgeEvent.getEntityId()); + entityId = new AssetId(entityUUID); break; case ENTITY_VIEW: - entityId = new EntityViewId(edgeEvent.getEntityId()); + entityId = new EntityViewId(entityUUID); break; case DASHBOARD: - entityId = new DashboardId(edgeEvent.getEntityId()); + entityId = new DashboardId(entityUUID); break; case TENANT: - entityId = TenantId.fromUUID(edgeEvent.getEntityId()); + entityId = TenantId.fromUUID(entityUUID); break; case CUSTOMER: - entityId = new CustomerId(edgeEvent.getEntityId()); + entityId = new CustomerId(entityUUID); break; case USER: - entityId = new UserId(edgeEvent.getEntityId()); + entityId = new UserId(entityUUID); break; case EDGE: - entityId = new EdgeId(edgeEvent.getEntityId()); + entityId = new EdgeId(entityUUID); break; default: - log.warn("Unsupported edge event type [{}]", edgeEvent); + log.warn("Unsupported edge event type [{}]", entityType); return null; } - return constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), - JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody()))); - } - - private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { - EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); - return DownlinkMsg.newBuilder() - .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) - .addEntityData(entityDataProto) - .build(); + JsonElement entityData = JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(body)); + return entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java new file mode 100644 index 0000000000..ffdb76b1ce --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.telemetry; + +import com.fasterxml.jackson.core.JsonProcessingException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.EntityDataProto; +import org.thingsboard.server.queue.util.TbCoreComponent; + +@Component +@Slf4j +@TbCoreComponent +public class TelemetryEdgeProcessor extends BaseTelemetryProcessor { + + @Override + protected String getMsgSourceKey() { + return DataConstants.EDGE_MSG_SOURCE; + } + + public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException { + EntityType entityType = EntityType.valueOf(edgeEvent.getType().name()); + EntityDataProto entityDataProto = convertTelemetryEventToEntityDataProto(entityType, edgeEvent.getEntityId(), + edgeEvent.getAction(), edgeEvent.getBody()); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addEntityData(entityDataProto) + .build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java similarity index 96% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java index 390fafe464..c88435bb5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/user/UserEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.user; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -29,6 +29,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UserCredentialsUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetBundleEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetBundleEdgeProcessor.java index b74e3c1cec..79083a127a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetBundleEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.widget; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.WidgetsBundleUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetTypeEdgeProcessor.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeEdgeProcessor.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetTypeEdgeProcessor.java index 10807b7b5c..1825f993b9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/widget/WidgetTypeEdgeProcessor.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.processor; +package org.thingsboard.server.service.edge.rpc.processor.widget; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; @@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.WidgetTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; @Component @Slf4j diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseEdgeControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseEdgeControllerTest.java index 3471c89c5d..5432f3a084 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseEdgeControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseEdgeControllerTest.java @@ -17,6 +17,10 @@ package org.thingsboard.server.controller; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -28,6 +32,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntitySubtype; @@ -58,13 +63,10 @@ import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.v1.UserCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.UserUpdateMsg; -import java.io.File; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.containsString; @@ -86,6 +88,10 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { private TenantId tenantId; private User tenantAdmin; + ListeningExecutorService executor; + + List> futures; + @Autowired private EdgeDao edgeDao; @@ -99,6 +105,8 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { @Before public void beforeTest() throws Exception { + executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass())); + loginSysAdmin(); Tenant tenant = new Tenant(); @@ -119,6 +127,8 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { @After public void afterTest() throws Exception { + executor.shutdownNow(); + loginSysAdmin(); doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) @@ -385,11 +395,14 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { @Test public void testFindTenantEdges() throws Exception { - List edges = new ArrayList<>(); - for (int i = 0; i < 178; i++) { + int cntEntity = 178; + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { Edge edge = constructEdge("Edge" + i, "default"); - edges.add(doPost("/api/edge", edge, Edge.class)); + futures.add(executor.submit(() -> + doPost("/api/edge", edge, Edge.class))); } + List edges = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); List loadedEdges = new ArrayList<>(); PageLink pageLink = new PageLink(23); PageData pageData = null; @@ -412,23 +425,30 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { @Test public void testFindTenantEdgesByName() throws Exception { String title1 = "Edge title 1"; - List edgesTitle1 = new ArrayList<>(); - for (int i = 0; i < 143; i++) { + int cntEntity = 143; + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, "default"); - edgesTitle1.add(doPost("/api/edge", edge, Edge.class)); + futures.add(executor.submit(() -> + doPost("/api/edge", edge, Edge.class))); } + List edgesTitle1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); + String title2 = "Edge title 2"; - List edgesTitle2 = new ArrayList<>(); - for (int i = 0; i < 75; i++) { + cntEntity = 75; + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, "default"); - edgesTitle2.add(doPost("/api/edge", edge, Edge.class)); + futures.add(executor.submit(() -> + doPost("/api/edge", edge, Edge.class))); } + List edgesTitle2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); List loadedEdgesTitle1 = new ArrayList<>(); PageLink pageLink = new PageLink(15, 0, title1); @@ -494,24 +514,31 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { public void testFindTenantEdgesByType() throws Exception { String title1 = "Edge title 1"; String type1 = "typeA"; - List edgesType1 = new ArrayList<>(); - for (int i = 0; i < 143; i++) { + int cntEntity = 143; + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, type1); - edgesType1.add(doPost("/api/edge", edge, Edge.class)); + futures.add(executor.submit(() -> + doPost("/api/edge", edge, Edge.class))); } + List edgesType1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); + String title2 = "Edge title 2"; String type2 = "typeB"; - List edgesType2 = new ArrayList<>(); - for (int i = 0; i < 75; i++) { + cntEntity = 75; + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, type2); - edgesType2.add(doPost("/api/edge", edge, Edge.class)); + futures.add(executor.submit(() -> + doPost("/api/edge", edge, Edge.class))); } + List edgesType2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); List loadedEdgesType1 = new ArrayList<>(); PageLink pageLink = new PageLink(15); @@ -582,14 +609,17 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { Mockito.reset(tbClusterService, auditLogService); - List edges = new ArrayList<>(); int cntEntity = 128; + futures = new ArrayList<>(cntEntity); for (int i = 0; i < cntEntity; i++) { Edge edge = constructEdge("Edge" + i, "default"); - edge = doPost("/api/edge", edge, Edge.class); - edges.add(doPost("/api/customer/" + customerId.getId().toString() - + "/edge/" + edge.getId().getId().toString(), Edge.class)); + futures.add(executor.submit(() -> { + Edge edge1 = doPost("/api/edge", edge, Edge.class); + return doPost("/api/customer/" + customerId.getId().toString() + + "/edge/" + edge1.getId().getId().toString(), Edge.class); + })); } + List edges = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAny(new Edge(), new Edge(), savedTenant.getId(), customerId, tenantAdmin.getId(), tenantAdmin.getEmail(), @@ -622,28 +652,37 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { customer = doPost("/api/customer", customer, Customer.class); CustomerId customerId = customer.getId(); + int cntEntity = 125; String title1 = "Edge title 1"; - List edgesTitle1 = new ArrayList<>(); - for (int i = 0; i < 125; i++) { + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, "default"); - edge = doPost("/api/edge", edge, Edge.class); - edgesTitle1.add(doPost("/api/customer/" + customerId.getId().toString() - + "/edge/" + edge.getId().getId().toString(), Edge.class)); + futures.add(executor.submit(() -> { + Edge edge1 = doPost("/api/edge", edge, Edge.class); + return doPost("/api/customer/" + customerId.getId().toString() + + "/edge/" + edge1.getId().getId().toString(), Edge.class); + })); } + List edgesTitle1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); + + cntEntity = 143; String title2 = "Edge title 2"; - List edgesTitle2 = new ArrayList<>(); - for (int i = 0; i < 143; i++) { + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, "default"); - edge = doPost("/api/edge", edge, Edge.class); - edgesTitle2.add(doPost("/api/customer/" + customerId.getId().toString() - + "/edge/" + edge.getId().getId().toString(), Edge.class)); + futures.add(executor.submit(() -> { + Edge edge1 = doPost("/api/edge", edge, Edge.class); + return doPost("/api/customer/" + customerId.getId().toString() + + "/edge/" + edge1.getId().getId().toString(), Edge.class); + })); } + List edgesTitle2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); List loadedEdgesTitle1 = new ArrayList<>(); PageLink pageLink = new PageLink(15, 0, title1); @@ -687,7 +726,7 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { .andExpect(status().isOk()); } - int cntEntity = loadedEdgesTitle1.size(); + cntEntity = loadedEdgesTitle1.size(); testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAnyAdditionalInfoAny(new Edge(), new Edge(), savedTenant.getId(), customerId, tenantAdmin.getId(), tenantAdmin.getEmail(), ActionType.UNASSIGNED_FROM_CUSTOMER, ActionType.UNASSIGNED_FROM_CUSTOMER, cntEntity, cntEntity, 3); @@ -719,30 +758,39 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest { customer = doPost("/api/customer", customer, Customer.class); CustomerId customerId = customer.getId(); + int cntEntity = 125; String title1 = "Edge title 1"; String type1 = "typeC"; - List edgesType1 = new ArrayList<>(); - for (int i = 0; i < 125; i++) { + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title1 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, type1); - edge = doPost("/api/edge", edge, Edge.class); - edgesType1.add(doPost("/api/customer/" + customerId.getId().toString() - + "/edge/" + edge.getId().getId().toString(), Edge.class)); + futures.add(executor.submit(() -> { + Edge edge1 = doPost("/api/edge", edge, Edge.class); + return doPost("/api/customer/" + customerId.getId().toString() + + "/edge/" + edge1.getId().getId().toString(), Edge.class); + })); } + List edgesType1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); + + cntEntity = 143; String title2 = "Edge title 2"; String type2 = "typeD"; - List edgesType2 = new ArrayList<>(); - for (int i = 0; i < 143; i++) { + futures = new ArrayList<>(cntEntity); + for (int i = 0; i < cntEntity; i++) { String suffix = StringUtils.randomAlphanumeric(15); String name = title2 + suffix; name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase(); Edge edge = constructEdge(name, type2); - edge = doPost("/api/edge", edge, Edge.class); - edgesType2.add(doPost("/api/customer/" + customerId.getId().toString() - + "/edge/" + edge.getId().getId().toString(), Edge.class)); + futures.add(executor.submit(() -> { + Edge edge1 = doPost("/api/edge", edge, Edge.class); + return doPost("/api/customer/" + customerId.getId().toString() + + "/edge/" + edge1.getId().getId().toString(), Edge.class); + })); } + List edgesType2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS)); List loadedEdgesType1 = new ArrayList<>(); PageLink pageLink = new PageLink(15, 0, title1); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java index 804f014bdc..d2e309c640 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseDeviceEdgeTest.java @@ -501,7 +501,6 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest { Assert.assertTrue(deviceUpdateMsgOpt.isPresent()); DeviceUpdateMsg latestDeviceUpdateMsg = deviceUpdateMsgOpt.get(); Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName()); - Assert.assertEquals(deviceOnCloudName, latestDeviceUpdateMsg.getConflictName()); UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB()); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index bba8767fca..cbd532b479 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -34,5 +34,5 @@ public enum EdgeEventActionType { ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, CREDENTIALS_REQUEST, - ENTITY_MERGE_REQUEST + ENTITY_MERGE_REQUEST // deprecated } \ No newline at end of file diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index b962badbcd..bf512467cc 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -111,7 +111,7 @@ enum UpdateMsgType { ENTITY_DELETED_RPC_MESSAGE = 2; ALARM_ACK_RPC_MESSAGE = 3; ALARM_CLEAR_RPC_MESSAGE = 4; - ENTITY_MERGE_RPC_MESSAGE = 5; + ENTITY_MERGE_RPC_MESSAGE = 5; // deprecated } message EntityDataProto { @@ -199,7 +199,7 @@ message DeviceUpdateMsg { string type = 9; optional string label = 10; optional string additionalInfo = 11; - optional string conflictName = 12; + optional string conflictName = 12; // deprecated optional int64 firmwareIdMSB = 13; optional int64 firmwareIdLSB = 14; optional bytes deviceDataBytes = 15;