For devices from edge new id generated every time

This commit is contained in:
Volodymyr Babak 2020-12-15 18:32:35 +02:00
parent b52d2aae17
commit 59c55fb2c5
8 changed files with 159 additions and 63 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc;
import com.datastax.driver.core.utils.UUIDs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -335,8 +336,8 @@ public final class EdgeGrpcSession implements Closeable {
case CREDENTIALS_REQUEST:
downlinkMsg = processCredentialsRequestMessage(edgeEvent);
break;
case ENTITY_EXISTS_REQUEST:
downlinkMsg = processEntityExistsRequestMessage(edgeEvent);
case ENTITY_MERGE_REQUEST:
downlinkMsg = processEntityMergeRequestMessage(edgeEvent);
break;
case RPC_CALL:
downlinkMsg = processRpcCallMsg(edgeEvent);
@ -352,13 +353,18 @@ public final class EdgeGrpcSession implements Closeable {
return result;
}
private DownlinkMsg processEntityExistsRequestMessage(EdgeEvent edgeEvent) {
private DownlinkMsg processEntityMergeRequestMessage(EdgeEvent edgeEvent) {
DownlinkMsg downlinkMsg = null;
if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), deviceId);
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device);
DeviceUpdateMsg d = ctx.getDeviceMsgConstructor().constructDeviceUpdatedMsg(UpdateMsgType.DEVICE_CONFLICT_RPC_MESSAGE, device, customerId);
String conflictName = null;
if(edgeEvent.getBody() != null) {
conflictName = edgeEvent.getBody().get("conflictName").asText();
}
DeviceUpdateMsg d = ctx.getDeviceMsgConstructor()
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(d))
.build();
@ -497,7 +503,7 @@ public final class EdgeGrpcSession implements Closeable {
if (device != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device);
DeviceUpdateMsg deviceUpdateMsg =
ctx.getDeviceMsgConstructor().constructDeviceUpdatedMsg(msgType, device, customerId);
ctx.getDeviceMsgConstructor().constructDeviceUpdatedMsg(msgType, device, customerId, null);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build();

View File

@ -38,7 +38,7 @@ public class DeviceMsgConstructor {
protected static final ObjectMapper mapper = new ObjectMapper();
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, CustomerId customerId) {
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, CustomerId customerId, String conflictName) {
DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
.setMsgType(msgType)
.setIdMSB(device.getId().getId().getMostSignificantBits())
@ -55,6 +55,9 @@ public class DeviceMsgConstructor {
if (device.getAdditionalInfo() != null) {
builder.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()));
}
if (conflictName != null) {
builder.setConflictName(conflictName);
}
return builder.build();
}

View File

@ -37,6 +37,7 @@ import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.user.UserService;
@ -64,6 +65,9 @@ public abstract class BaseProcessor {
@Autowired
protected EntityViewService entityViewService;
@Autowired
protected EdgeService edgeService;
@Autowired
protected CustomerService customerService;

View File

@ -17,12 +17,15 @@ package org.thingsboard.server.service.edge.rpc.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.security.core.parameters.P;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.common.data.DataConstants;
@ -52,6 +55,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
@ -64,36 +68,62 @@ public class DeviceProcessor extends BaseProcessor {
public ListenableFuture<Void> onDeviceUpdate(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
DeviceId edgeDeviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
switch (deviceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
String deviceName = deviceUpdateMsg.getName();
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
if (device != null) {
log.info("[{}] Device with name '{}' already exists on the cloud. Updating id of device entity on the edge", tenantId, deviceName);
if (!device.getId().equals(edgeDeviceId)) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_EXISTS_REQUEST, device.getId(), null);
ListenableFuture<List<EdgeId>> future = edgeService.findRelatedEdgeIdsByEntityId(tenantId, device.getId());
SettableFuture<Void> futureToSet = SettableFuture.create();
Futures.addCallback(future, new FutureCallback<List<EdgeId>>() {
@Override
public void onSuccess(@Nullable List<EdgeId> edgeIds) {
boolean update = false;
if (edgeIds != null && !edgeIds.isEmpty()) {
if (edgeIds.contains(edge.getId())) {
update = true;
}
}
Device device;
if (update) {
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " +
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
updateDevice(tenantId, edge, deviceUpdateMsg);
} else {
Device deviceById = deviceService.findDeviceById(edge.getTenantId(), edgeDeviceId);
if (deviceById != null) {
log.info("[{}] Device ID [{}] already used by other device on the cloud. Creating new device and replacing device entity on the edge", tenantId, edgeDeviceId.getId());
device = createDevice(tenantId, edge, deviceUpdateMsg);
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_EXISTS_REQUEST, device.getId(), null);
} else {
device = createDevice(tenantId, edge, deviceUpdateMsg);
}
}
// TODO: voba - assign device only in case device is not assigned yet. Missing functionality to check this relation prior assignment
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." +
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
String newDeviceName = deviceUpdateMsg.getName() + "_" + RandomStringUtils.randomAlphabetic(15);
device = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName);
ObjectNode body = mapper.createObjectNode();
body.put("conflictName", deviceName);
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, device.getId(), body);
deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
}
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}] Failed to get related edge ids by device id [{}], edge [{}]", tenantId, deviceUpdateMsg, edge.getId(), t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
return futureToSet;
} else {
log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg);
device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName());
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, device.getId(), null);
deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
}
break;
case ENTITY_UPDATED_RPC_MESSAGE:
updateDevice(tenantId, edge, deviceUpdateMsg);
break;
case ENTITY_DELETED_RPC_MESSAGE:
Device deviceToDelete = deviceService.findDeviceById(tenantId, edgeDeviceId);
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
if (deviceToDelete != null) {
deviceService.unassignDeviceFromEdge(tenantId, edgeDeviceId, edge.getId());
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId());
}
break;
case UNRECOGNIZED:
@ -103,7 +133,6 @@ public class DeviceProcessor extends BaseProcessor {
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> onDeviceCredentialsUpdate(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
@ -131,36 +160,34 @@ public class DeviceProcessor extends BaseProcessor {
private void updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
Device device = deviceService.findDeviceById(tenantId, deviceId);
if (device != null) {
device.setName(deviceUpdateMsg.getName());
device.setType(deviceUpdateMsg.getType());
device.setLabel(deviceUpdateMsg.getLabel());
device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()));
deviceService.saveDevice(device);
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
} else {
log.warn("[{}] can't find device [{}], edge [{}]", tenantId, deviceUpdateMsg, edge.getId());
}
}
private Device createDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
private Device createDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg, String deviceName) {
Device device;
try {
deviceCreationLock.lock();
log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
device = new Device();
device.setTenantId(edge.getTenantId());
device.setCustomerId(edge.getCustomerId());
device.setId(deviceId);
device.setName(deviceUpdateMsg.getName());
device.setName(deviceName);
device.setType(deviceUpdateMsg.getType());
device.setLabel(deviceUpdateMsg.getLabel());
device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()));
device = deviceService.saveDevice(device);
createDeviceCredentials(device);
createRelationFromEdge(tenantId, edge.getId(), device.getId());
deviceStateService.onDeviceAdded(device);
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
} finally {
deviceCreationLock.unlock();
}
@ -176,14 +203,6 @@ public class DeviceProcessor extends BaseProcessor {
relationService.saveRelation(tenantId, relation);
}
private void createDeviceCredentials(Device device) {
DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(device.getId());
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
deviceCredentials.setCredentialsId(RandomStringUtils.randomAlphanumeric(20));
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
}
private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, Device device) {
try {
DeviceId deviceId = device.getId();

View File

@ -26,6 +26,7 @@ import com.google.protobuf.AbstractMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -48,6 +49,7 @@ import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@ -964,6 +966,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
private void testSendMessagesToCloud() throws Exception {
log.info("Sending messages to cloud");
sendDevice();
sendDeviceWithNameThatAlreadyExistsOnCloud();
sendRelationRequest();
sendAlarm();
sendTelemetry();
@ -992,17 +995,66 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
edgeImitator.waitForResponses();
Device device = doGet("/api/device/" + uuid.toString(), Device.class);
edgeImitator.waitForResponses();
edgeImitator.waitForMessages();
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
Assert.assertEquals("Edge Device 2", latestDeviceUpdateMsg.getName());
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB());
Device device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device);
Assert.assertEquals("Edge Device 2", device.getName());
}
private void sendDeviceWithNameThatAlreadyExistsOnCloud() throws Exception {
String deviceOnCloudName = RandomStringUtils.randomAlphanumeric(15);
Device deviceOnCloud = saveDevice(deviceOnCloudName);
UUID uuid = UUIDs.timeBased();
UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
DeviceUpdateMsg.Builder deviceUpdateMsgBuilder = DeviceUpdateMsg.newBuilder();
deviceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits());
deviceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits());
deviceUpdateMsgBuilder.setName(deviceOnCloudName);
deviceUpdateMsgBuilder.setType("test");
deviceUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE);
testAutoGeneratedCodeByProtobuf(deviceUpdateMsgBuilder);
uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1);
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
edgeImitator.waitForResponses();
edgeImitator.waitForMessages();
AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
Assert.assertEquals(deviceOnCloudName, latestDeviceUpdateMsg.getConflictName());
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB());
Assert.assertNotEquals(deviceOnCloud.getId().getId(), newDeviceId);
Device device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device);
Assert.assertNotEquals(deviceOnCloudName, device.getName());
}
private void sendRelationRequest() throws Exception {
Device device = findDeviceByName("Edge Device 1");
Asset asset = findAssetByName("Edge Asset 1");

View File

@ -34,5 +34,5 @@ public enum EdgeEventActionType {
ASSIGNED_TO_EDGE,
UNASSIGNED_FROM_EDGE,
CREDENTIALS_REQUEST,
ENTITY_EXISTS_REQUEST
ENTITY_MERGE_REQUEST
}

View File

@ -97,7 +97,7 @@ enum UpdateMsgType {
ENTITY_DELETED_RPC_MESSAGE = 2;
ALARM_ACK_RPC_MESSAGE = 3;
ALARM_CLEAR_RPC_MESSAGE = 4;
DEVICE_CONFLICT_RPC_MESSAGE = 5;
ENTITY_MERGE_RPC_MESSAGE = 5;
}
message EntityDataProto {
@ -182,6 +182,7 @@ message DeviceUpdateMsg {
string type = 7;
string label = 8;
string additionalInfo = 9;
string conflictName = 10;
}
message DeviceCredentialsUpdateMsg {

View File

@ -87,6 +87,7 @@ import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.data.security.model.SecuritySettings;
@ -97,6 +98,7 @@ import org.thingsboard.server.common.data.widget.WidgetsBundle;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -626,22 +628,31 @@ public class RestClient implements ClientHttpRequestInterceptor, Closeable {
}
public List<ComponentDescriptor> getComponentDescriptorsByType(ComponentType componentType) {
return getComponentDescriptorsByType(componentType, RuleChainType.CORE);
}
public List<ComponentDescriptor> getComponentDescriptorsByType(ComponentType componentType, RuleChainType ruleChainType) {
return restTemplate.exchange(
baseURL + "/api/components?componentType={componentType}",
baseURL + "/api/components/" + componentType.name() + "/?ruleChainType={ruleChainType}",
HttpMethod.GET, HttpEntity.EMPTY,
new ParameterizedTypeReference<List<ComponentDescriptor>>() {
},
componentType).getBody();
ruleChainType).getBody();
}
public List<ComponentDescriptor> getComponentDescriptorsByTypes(List<ComponentType> componentTypes) {
return getComponentDescriptorsByTypes(componentTypes, RuleChainType.CORE);
}
public List<ComponentDescriptor> getComponentDescriptorsByTypes(List<ComponentType> componentTypes, RuleChainType ruleChainType) {
return restTemplate.exchange(
baseURL + "/api/components?componentTypes={componentTypes}",
baseURL + "/api/components?componentTypes={componentTypes}&ruleChainType={ruleChainType}",
HttpMethod.GET,
HttpEntity.EMPTY,
new ParameterizedTypeReference<List<ComponentDescriptor>>() {
},
listEnumToString(componentTypes))
listEnumToString(componentTypes),
ruleChainType)
.getBody();
}