From 8bea1016a33a9ab6404430f30f654543f36bdeb3 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 23 Jan 2023 16:13:35 +0200 Subject: [PATCH] Move saveOrCreateDevice to base device processor class --- .../service/edge/rpc/EdgeGrpcSession.java | 2 +- .../processor/device/BaseDeviceProcessor.java | 70 ++++++++++++++ .../processor/device/DeviceEdgeProcessor.java | 91 ++++--------------- 3 files changed, 88 insertions(+), 75 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 623573feff..db19461aa4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -560,7 +560,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceMsgFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); } } if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java index 66b14c0f59..bedc372ce6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/BaseDeviceProcessor.java @@ -15,19 +15,30 @@ */ package org.thingsboard.server.service.edge.rpc.processor.device; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomStringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.device.data.DeviceData; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; +import java.util.Optional; import java.util.UUID; @Slf4j @@ -36,6 +47,65 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor { @Autowired protected DataDecodingEncodingService dataDecodingEncodingService; + protected Pair saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, CustomerId customerId) { + boolean created = false; + boolean deviceNameUpdated = false; + deviceCreationLock.lock(); + try { + Device device = deviceService.findDeviceById(tenantId, deviceId); + String deviceName = deviceUpdateMsg.getName(); + if (device == null) { + created = true; + device = new Device(); + device.setTenantId(tenantId); + device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId())); + Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); + if (deviceByName != null) { + deviceName = deviceName + "_" + RandomStringUtils.randomAlphabetic(15); + log.warn("Device with name {} already exists on the cloud. Renaming device name to {}", + deviceUpdateMsg.getName(), deviceName); + deviceNameUpdated = true; + } + } + device.setName(deviceName); + device.setType(deviceUpdateMsg.getType()); + device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null); + device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo() + ? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null); + + UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB()); + device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null); + + device.setCustomerId(customerId); + + Optional deviceDataOpt = + dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); + device.setDeviceData(deviceDataOpt.orElse(null)); + + UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB()); + device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null); + + UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB()); + device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null); + deviceValidator.validate(device, Device::getTenantId); + if (created) { + device.setId(deviceId); + } + Device savedDevice = deviceService.saveDevice(device, false); + if (created) { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20)); + deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials); + } + tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false); + } finally { + deviceCreationLock.unlock(); + } + return Pair.of(created, deviceNameUpdated); + } + public ListenableFuture processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java index d7c2daf2df..11f6d3f033 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessor.java @@ -15,14 +15,13 @@ */ package org.thingsboard.server.service.edge.rpc.processor.device; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.RandomStringUtils; +import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.DataConstants; @@ -30,23 +29,19 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -65,7 +60,6 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; -import java.util.Optional; import java.util.UUID; @Component @@ -73,8 +67,8 @@ import java.util.UUID; @TbCoreComponent public class DeviceEdgeProcessor extends BaseDeviceProcessor { - public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { - log.trace("[{}] executing processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); + public ListenableFuture processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { + log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); try { switch (deviceUpdateMsg.getMsgType()) { @@ -103,68 +97,17 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { } private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) { - deviceCreationLock.lock(); - try { - Device device = deviceService.findDeviceById(tenantId, deviceId); - boolean created = false; - boolean deviceNameUpdated = false; - String deviceName = deviceUpdateMsg.getName(); - if (device == null) { - created = true; - device = new Device(); - device.setTenantId(tenantId); - device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId())); - Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName); - if (deviceByName != null) { - deviceName = deviceName + "_" + RandomStringUtils.randomAlphabetic(15); - log.warn("Device with name {} already exists on the cloud. Renaming device name to {}", - deviceUpdateMsg.getName(), deviceName); - deviceNameUpdated = true; - } - } - device.setName(deviceName); - device.setType(deviceUpdateMsg.getType()); - device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null); - device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo() - ? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null); - - UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB()); - device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null); - - device.setCustomerId(safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB())); - - Optional deviceDataOpt = - dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - device.setDeviceData(deviceDataOpt.orElse(null)); - - UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB()); - device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null); - - UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB()); - device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null); - deviceValidator.validate(device, Device::getTenantId); - if (created) { - device.setId(deviceId); - } - Device savedDevice = deviceService.saveDevice(device, false); - if (created) { - DeviceCredentials deviceCredentials = new DeviceCredentials(); - deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); - deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); - deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20)); - deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials); - - createRelationFromEdge(tenantId, edge.getId(), device.getId()); - pushDeviceCreatedEventToRuleEngine(tenantId, edge, device); - deviceService.assignDeviceToEdge(tenantId, device.getId(), edge.getId()); - } - tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false); - - if (deviceNameUpdated) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null); - } - } finally { - deviceCreationLock.unlock(); + CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()); + Pair resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId); + Boolean created = resultPair.getFirst(); + if (created) { + createRelationFromEdge(tenantId, edge.getId(), deviceId); + pushDeviceCreatedEventToRuleEngine(tenantId, edge, deviceId); + deviceService.assignDeviceToEdge(tenantId, deviceId, edge.getId()); + } + Boolean deviceNameUpdated = resultPair.getSecond(); + if (deviceNameUpdated) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null); } } @@ -177,9 +120,9 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { relationService.saveRelation(tenantId, relation); } - private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, Device device) { + private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) { try { - DeviceId deviceId = device.getId(); + Device device = deviceService.findDeviceById(tenantId, deviceId); ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(), getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); @@ -195,7 +138,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor { } }); } catch (JsonProcessingException | IllegalArgumentException e) { - log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); + log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e); } }