Introduced BaseDeviceEdgeProcessor
This commit is contained in:
parent
69285ee596
commit
94c69acf5d
@ -457,7 +457,6 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
case ASSIGNED_TO_CUSTOMER:
|
case ASSIGNED_TO_CUSTOMER:
|
||||||
case UNASSIGNED_FROM_CUSTOMER:
|
case UNASSIGNED_FROM_CUSTOMER:
|
||||||
case CREDENTIALS_REQUEST:
|
case CREDENTIALS_REQUEST:
|
||||||
case ENTITY_MERGE_REQUEST:
|
|
||||||
case RPC_CALL:
|
case RPC_CALL:
|
||||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||||
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
|
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
|
||||||
@ -566,7 +565,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
|
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
|
||||||
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
|
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
|
||||||
result.add(ctx.getDeviceProcessor().processDeviceCredentialsFromEdge(edge.getTenantId(), deviceCredentialsUpdateMsg));
|
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
|
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
|
||||||
|
|||||||
@ -41,7 +41,7 @@ public class DeviceMsgConstructor {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private DataDecodingEncodingService dataDecodingEncodingService;
|
private DataDecodingEncodingService dataDecodingEncodingService;
|
||||||
|
|
||||||
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, String conflictName) {
|
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
|
||||||
DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
|
DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
|
||||||
.setMsgType(msgType)
|
.setMsgType(msgType)
|
||||||
.setIdMSB(device.getId().getId().getMostSignificantBits())
|
.setIdMSB(device.getId().getId().getMostSignificantBits())
|
||||||
@ -70,9 +70,6 @@ public class DeviceMsgConstructor {
|
|||||||
builder.setSoftwareIdMSB(device.getSoftwareId().getId().getMostSignificantBits())
|
builder.setSoftwareIdMSB(device.getSoftwareId().getId().getMostSignificantBits())
|
||||||
.setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits());
|
.setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits());
|
||||||
}
|
}
|
||||||
if (conflictName != null) {
|
|
||||||
builder.setConflictName(conflictName);
|
|
||||||
}
|
|
||||||
if (device.getDeviceData() != null) {
|
if (device.getDeviceData() != null) {
|
||||||
builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData())));
|
builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData())));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,65 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.thingsboard.server.common.data.Device;
|
||||||
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||||
|
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||||
|
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
|
||||||
|
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||||
|
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public abstract class BaseDeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
protected DataDecodingEncodingService dataDecodingEncodingService;
|
||||||
|
|
||||||
|
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||||
|
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
||||||
|
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
||||||
|
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
|
||||||
|
return Futures.transform(deviceFuture, device -> {
|
||||||
|
if (device != null) {
|
||||||
|
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
||||||
|
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
||||||
|
try {
|
||||||
|
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
||||||
|
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
||||||
|
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
||||||
|
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue()
|
||||||
|
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
|
||||||
|
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
|
||||||
|
device.getName(), deviceCredentialsUpdateMsg, e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}, dbCallbackExecutorService);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -22,7 +22,7 @@ import com.google.common.util.concurrent.Futures;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
@ -42,8 +42,6 @@ import org.thingsboard.server.common.data.id.EdgeId;
|
|||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.page.PageData;
|
|
||||||
import org.thingsboard.server.common.data.page.PageLink;
|
|
||||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||||
import org.thingsboard.server.common.data.rpc.RpcError;
|
import org.thingsboard.server.common.data.rpc.RpcError;
|
||||||
@ -64,9 +62,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.TbQueueCallback;
|
import org.thingsboard.server.queue.TbQueueCallback;
|
||||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
|
||||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
|
||||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
|
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
@ -75,54 +71,17 @@ import java.util.UUID;
|
|||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@TbCoreComponent
|
@TbCoreComponent
|
||||||
public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
public class DeviceEdgeProcessor extends BaseDeviceEdgeProcessor {
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DataDecodingEncodingService dataDecodingEncodingService;
|
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||||
|
log.trace("[{}] executing processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
||||||
|
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||||
try {
|
try {
|
||||||
log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
|
||||||
switch (deviceUpdateMsg.getMsgType()) {
|
switch (deviceUpdateMsg.getMsgType()) {
|
||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
String deviceName = deviceUpdateMsg.getName();
|
|
||||||
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
|
|
||||||
if (device != null) {
|
|
||||||
boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device);
|
|
||||||
if (deviceAlreadyExistsForThisEdge) {
|
|
||||||
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " +
|
|
||||||
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
|
||||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
|
||||||
} else {
|
|
||||||
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." +
|
|
||||||
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
|
||||||
String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15);
|
|
||||||
try {
|
|
||||||
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, newDeviceName);
|
|
||||||
} catch (DataValidationException e) {
|
|
||||||
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e);
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
|
||||||
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
|
|
||||||
body.put("conflictName", deviceName);
|
|
||||||
ListenableFuture<Void> input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, deviceId, body);
|
|
||||||
return Futures.transformAsync(input, unused ->
|
|
||||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null),
|
|
||||||
dbCallbackExecutorService);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg);
|
|
||||||
try {
|
|
||||||
createDevice(tenantId, deviceId, edge, deviceUpdateMsg, deviceUpdateMsg.getName());
|
|
||||||
} catch (DataValidationException e) {
|
|
||||||
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e);
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
|
||||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
|
||||||
}
|
|
||||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge);
|
||||||
|
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
||||||
case ENTITY_DELETED_RPC_MESSAGE:
|
case ENTITY_DELETED_RPC_MESSAGE:
|
||||||
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
||||||
if (deviceToDelete != null) {
|
if (deviceToDelete != null) {
|
||||||
@ -133,64 +92,35 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
default:
|
default:
|
||||||
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType());
|
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (DataValidationException e) {
|
||||||
log.error("Failed to process device message from edge, {}", deviceUpdateMsg, e);
|
if (e.getMessage().contains("Can't create more then")) {
|
||||||
return Futures.immediateFailedFuture(e);
|
log.warn("[{}] Number of allowed devices violated {}", tenantId, deviceUpdateMsg, e);
|
||||||
|
return Futures.immediateFuture(null);
|
||||||
|
} else {
|
||||||
|
return Futures.immediateFailedFuture(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isDeviceAlreadyExistsOnCloudForThisEdge(TenantId tenantId, Edge edge, Device device) {
|
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
|
||||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
|
||||||
PageData<EdgeId> pageData;
|
|
||||||
do {
|
|
||||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, device.getId(), pageLink);
|
|
||||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
|
||||||
if (pageData.getData().contains(edge.getId())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (pageData.hasNext()) {
|
|
||||||
pageLink = pageLink.nextPageLink();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (pageData != null && pageData.hasNext());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
|
||||||
log.debug("[{}] Executing processDeviceCredentialsFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
|
||||||
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
|
|
||||||
return Futures.transform(deviceFuture, device -> {
|
|
||||||
if (device != null) {
|
|
||||||
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
|
||||||
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
|
||||||
try {
|
|
||||||
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
|
||||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
|
||||||
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
|
||||||
if (deviceCredentialsUpdateMsg.hasCredentialsValue()) {
|
|
||||||
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.getCredentialsValue());
|
|
||||||
}
|
|
||||||
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", device.getName(), deviceCredentialsUpdateMsg, e);
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}, dbCallbackExecutorService);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createDevice(TenantId tenantId, DeviceId deviceId, Edge edge, DeviceUpdateMsg deviceUpdateMsg, String deviceName) {
|
|
||||||
deviceCreationLock.lock();
|
deviceCreationLock.lock();
|
||||||
try {
|
try {
|
||||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||||
boolean created = false;
|
boolean created = false;
|
||||||
|
boolean deviceNameUpdated = false;
|
||||||
|
String deviceName = deviceUpdateMsg.getName();
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
|
created = true;
|
||||||
device = new Device();
|
device = new Device();
|
||||||
device.setTenantId(tenantId);
|
device.setTenantId(tenantId);
|
||||||
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
|
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
|
||||||
created = true;
|
Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
|
||||||
|
if (deviceByName != null) {
|
||||||
|
deviceName = deviceName + "_" + RandomStringUtils.randomAlphabetic(15);
|
||||||
|
log.warn("Device with name {} already exists on the cloud. Renaming device name to {}",
|
||||||
|
deviceUpdateMsg.getName(), deviceName);
|
||||||
|
deviceNameUpdated = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
device.setName(deviceName);
|
device.setName(deviceName);
|
||||||
device.setType(deviceUpdateMsg.getType());
|
device.setType(deviceUpdateMsg.getType());
|
||||||
@ -212,7 +142,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
|
|
||||||
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
|
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
|
||||||
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
|
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
|
||||||
|
|
||||||
deviceValidator.validate(device, Device::getTenantId);
|
deviceValidator.validate(device, Device::getTenantId);
|
||||||
if (created) {
|
if (created) {
|
||||||
device.setId(deviceId);
|
device.setId(deviceId);
|
||||||
@ -224,45 +153,21 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
||||||
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
|
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
|
||||||
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
|
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
|
||||||
|
|
||||||
|
createRelationFromEdge(tenantId, edge.getId(), device.getId());
|
||||||
|
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
|
||||||
|
deviceService.assignDeviceToEdge(tenantId, device.getId(), edge.getId());
|
||||||
}
|
}
|
||||||
createRelationFromEdge(tenantId, edge.getId(), device.getId());
|
|
||||||
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
|
|
||||||
deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
|
|
||||||
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
|
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
|
||||||
|
|
||||||
|
if (deviceNameUpdated) {
|
||||||
|
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
deviceCreationLock.unlock();
|
deviceCreationLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ListenableFuture<Void> updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
|
||||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
|
||||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
|
||||||
device.setName(deviceUpdateMsg.getName());
|
|
||||||
device.setType(deviceUpdateMsg.getType());
|
|
||||||
device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null);
|
|
||||||
device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo()
|
|
||||||
? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null);
|
|
||||||
|
|
||||||
UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB());
|
|
||||||
device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null);
|
|
||||||
|
|
||||||
device.setCustomerId(safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()));
|
|
||||||
|
|
||||||
Optional<DeviceData> deviceDataOpt =
|
|
||||||
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
|
|
||||||
device.setDeviceData(deviceDataOpt.orElse(null));
|
|
||||||
|
|
||||||
UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB());
|
|
||||||
device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null);
|
|
||||||
|
|
||||||
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
|
|
||||||
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
|
|
||||||
|
|
||||||
Device savedDevice = deviceService.saveDevice(device);
|
|
||||||
tbClusterService.onDeviceUpdated(savedDevice, device, false);
|
|
||||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
|
private void createRelationFromEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId) {
|
||||||
EntityRelation relation = new EntityRelation();
|
EntityRelation relation = new EntityRelation();
|
||||||
relation.setFrom(edgeId);
|
relation.setFrom(edgeId);
|
||||||
@ -403,7 +308,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
if (device != null) {
|
if (device != null) {
|
||||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||||
DeviceUpdateMsg deviceUpdateMsg =
|
DeviceUpdateMsg deviceUpdateMsg =
|
||||||
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, null);
|
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device);
|
||||||
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
.addDeviceUpdateMsg(deviceUpdateMsg);
|
.addDeviceUpdateMsg(deviceUpdateMsg);
|
||||||
@ -438,8 +343,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
return convertRpcCallEventToDownlink(edgeEvent);
|
return convertRpcCallEventToDownlink(edgeEvent);
|
||||||
case CREDENTIALS_REQUEST:
|
case CREDENTIALS_REQUEST:
|
||||||
return convertCredentialsRequestEventToDownlink(edgeEvent);
|
return convertCredentialsRequestEventToDownlink(edgeEvent);
|
||||||
case ENTITY_MERGE_REQUEST:
|
|
||||||
return convertEntityMergeRequestEventToDownlink(edgeEvent);
|
|
||||||
}
|
}
|
||||||
return downlinkMsg;
|
return downlinkMsg;
|
||||||
}
|
}
|
||||||
@ -463,21 +366,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public DownlinkMsg convertEntityMergeRequestEventToDownlink(EdgeEvent edgeEvent) {
|
|
||||||
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
|
|
||||||
Device device = deviceService.findDeviceById(edgeEvent.getTenantId(), deviceId);
|
|
||||||
String conflictName = null;
|
|
||||||
if(edgeEvent.getBody() != null) {
|
|
||||||
conflictName = edgeEvent.getBody().get("conflictName").asText();
|
|
||||||
}
|
|
||||||
DeviceUpdateMsg deviceUpdateMsg = deviceMsgConstructor
|
|
||||||
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, conflictName);
|
|
||||||
return DownlinkMsg.newBuilder()
|
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
|
||||||
.addDeviceUpdateMsg(deviceUpdateMsg)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public ListenableFuture<Void> processDeviceNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
public ListenableFuture<Void> processDeviceNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||||
return processEntityNotification(tenantId, edgeNotificationMsg);
|
return processEntityNotification(tenantId, edgeNotificationMsg);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -501,7 +501,6 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
|||||||
Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
|
Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
|
||||||
DeviceUpdateMsg latestDeviceUpdateMsg = deviceUpdateMsgOpt.get();
|
DeviceUpdateMsg latestDeviceUpdateMsg = deviceUpdateMsgOpt.get();
|
||||||
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
|
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
|
||||||
Assert.assertEquals(deviceOnCloudName, latestDeviceUpdateMsg.getConflictName());
|
|
||||||
|
|
||||||
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB());
|
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB());
|
||||||
|
|
||||||
|
|||||||
@ -33,6 +33,5 @@ public enum EdgeEventActionType {
|
|||||||
ALARM_CLEAR,
|
ALARM_CLEAR,
|
||||||
ASSIGNED_TO_EDGE,
|
ASSIGNED_TO_EDGE,
|
||||||
UNASSIGNED_FROM_EDGE,
|
UNASSIGNED_FROM_EDGE,
|
||||||
CREDENTIALS_REQUEST,
|
CREDENTIALS_REQUEST
|
||||||
ENTITY_MERGE_REQUEST
|
|
||||||
}
|
}
|
||||||
@ -111,7 +111,6 @@ enum UpdateMsgType {
|
|||||||
ENTITY_DELETED_RPC_MESSAGE = 2;
|
ENTITY_DELETED_RPC_MESSAGE = 2;
|
||||||
ALARM_ACK_RPC_MESSAGE = 3;
|
ALARM_ACK_RPC_MESSAGE = 3;
|
||||||
ALARM_CLEAR_RPC_MESSAGE = 4;
|
ALARM_CLEAR_RPC_MESSAGE = 4;
|
||||||
ENTITY_MERGE_RPC_MESSAGE = 5;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
message EntityDataProto {
|
message EntityDataProto {
|
||||||
@ -199,7 +198,7 @@ message DeviceUpdateMsg {
|
|||||||
string type = 9;
|
string type = 9;
|
||||||
optional string label = 10;
|
optional string label = 10;
|
||||||
optional string additionalInfo = 11;
|
optional string additionalInfo = 11;
|
||||||
optional string conflictName = 12;
|
optional string conflictName = 12; // deprecated
|
||||||
optional int64 firmwareIdMSB = 13;
|
optional int64 firmwareIdMSB = 13;
|
||||||
optional int64 firmwareIdLSB = 14;
|
optional int64 firmwareIdLSB = 14;
|
||||||
optional bytes deviceDataBytes = 15;
|
optional bytes deviceDataBytes = 15;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user