Move saveOrCreateDevice to base device processor class

This commit is contained in:
Volodymyr Babak 2023-01-23 16:13:35 +02:00
parent e8c56dfe06
commit 8bea1016a3
3 changed files with 88 additions and 75 deletions

View File

@ -560,7 +560,7 @@ public final class EdgeGrpcSession implements Closeable {
} }
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); result.add(ctx.getDeviceProcessor().processDeviceMsgFromEdge(edge.getTenantId(), edge, deviceUpdateMsg));
} }
} }
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {

View File

@ -15,19 +15,30 @@
*/ */
package org.thingsboard.server.service.edge.rpc.processor.device; package org.thingsboard.server.service.edge.rpc.processor.device;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@Slf4j @Slf4j
@ -36,6 +47,65 @@ public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
@Autowired @Autowired
protected DataDecodingEncodingService dataDecodingEncodingService; protected DataDecodingEncodingService dataDecodingEncodingService;
protected Pair<Boolean, Boolean> saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, CustomerId customerId) {
boolean created = false;
boolean deviceNameUpdated = false;
deviceCreationLock.lock();
try {
Device device = deviceService.findDeviceById(tenantId, deviceId);
String deviceName = deviceUpdateMsg.getName();
if (device == null) {
created = true;
device = new Device();
device.setTenantId(tenantId);
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
if (deviceByName != null) {
deviceName = deviceName + "_" + RandomStringUtils.randomAlphabetic(15);
log.warn("Device with name {} already exists on the cloud. Renaming device name to {}",
deviceUpdateMsg.getName(), deviceName);
deviceNameUpdated = true;
}
}
device.setName(deviceName);
device.setType(deviceUpdateMsg.getType());
device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null);
device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo()
? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null);
UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB());
device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null);
device.setCustomerId(customerId);
Optional<DeviceData> deviceDataOpt =
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
device.setDeviceData(deviceDataOpt.orElse(null));
UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB());
device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null);
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
deviceValidator.validate(device, Device::getTenantId);
if (created) {
device.setId(deviceId);
}
Device savedDevice = deviceService.saveDevice(device, false);
if (created) {
DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
}
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
} finally {
deviceCreationLock.unlock();
}
return Pair.of(created, deviceNameUpdated);
}
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg); log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));

View File

@ -15,14 +15,13 @@
*/ */
package org.thingsboard.server.service.edge.rpc.processor.device; package org.thingsboard.server.service.edge.rpc.processor.device;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils; import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
@ -30,23 +29,19 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -65,7 +60,6 @@ import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@Component @Component
@ -73,8 +67,8 @@ import java.util.UUID;
@TbCoreComponent @TbCoreComponent
public class DeviceEdgeProcessor extends BaseDeviceProcessor { public class DeviceEdgeProcessor extends BaseDeviceProcessor {
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
log.trace("[{}] executing processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
try { try {
switch (deviceUpdateMsg.getMsgType()) { switch (deviceUpdateMsg.getMsgType()) {
@ -103,69 +97,18 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
} }
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) { private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
deviceCreationLock.lock(); CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB());
try { Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId);
Device device = deviceService.findDeviceById(tenantId, deviceId); Boolean created = resultPair.getFirst();
boolean created = false;
boolean deviceNameUpdated = false;
String deviceName = deviceUpdateMsg.getName();
if (device == null) {
created = true;
device = new Device();
device.setTenantId(tenantId);
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
if (deviceByName != null) {
deviceName = deviceName + "_" + RandomStringUtils.randomAlphabetic(15);
log.warn("Device with name {} already exists on the cloud. Renaming device name to {}",
deviceUpdateMsg.getName(), deviceName);
deviceNameUpdated = true;
}
}
device.setName(deviceName);
device.setType(deviceUpdateMsg.getType());
device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null);
device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo()
? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null);
UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB());
device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null);
device.setCustomerId(safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()));
Optional<DeviceData> deviceDataOpt =
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
device.setDeviceData(deviceDataOpt.orElse(null));
UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB());
device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null);
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
deviceValidator.validate(device, Device::getTenantId);
if (created) { if (created) {
device.setId(deviceId); createRelationFromEdge(tenantId, edge.getId(), deviceId);
pushDeviceCreatedEventToRuleEngine(tenantId, edge, deviceId);
deviceService.assignDeviceToEdge(tenantId, deviceId, edge.getId());
} }
Device savedDevice = deviceService.saveDevice(device, false); Boolean deviceNameUpdated = resultPair.getSecond();
if (created) {
DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
createRelationFromEdge(tenantId, edge.getId(), device.getId());
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
deviceService.assignDeviceToEdge(tenantId, device.getId(), edge.getId());
}
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
if (deviceNameUpdated) { if (deviceNameUpdated) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null); saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null);
} }
} finally {
deviceCreationLock.unlock();
}
} }
private void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) { private void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
@ -177,9 +120,9 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
relationService.saveRelation(tenantId, relation); relationService.saveRelation(tenantId, relation);
} }
private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, Device device) { private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) {
try { try {
DeviceId deviceId = device.getId(); Device device = deviceService.findDeviceById(tenantId, deviceId);
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device); ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(), TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(),
getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode)); getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
@ -195,7 +138,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {
log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e); log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e);
} }
} }