Device created on the edge will use that same ID on the cloud

This commit is contained in:
Volodymyr Babak 2021-06-04 18:25:02 +03:00
parent 14b19d428f
commit 77c080f82b
8 changed files with 84 additions and 195 deletions

View File

@ -1,167 +0,0 @@
{
"ruleChain": {
"additionalInfo": null,
"name": "Edge Root Rule Chain",
"type": "EDGE",
"firstRuleNodeId": null,
"root": true,
"debugMode": false,
"configuration": null
},
"metadata": {
"firstNodeIndex": 0,
"nodes": [
{
"additionalInfo": {
"description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.",
"layoutX": 187,
"layoutY": 468
},
"type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode",
"name": "Device Profile Node",
"debugMode": false,
"configuration": {
"persistAlarmRulesState": false,
"fetchAlarmRulesStateOnStart": false
}
},
{
"additionalInfo": {
"layoutX": 823,
"layoutY": 157
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode",
"name": "Save Timeseries",
"debugMode": false,
"configuration": {
"defaultTTL": 0
}
},
{
"additionalInfo": {
"layoutX": 824,
"layoutY": 52
},
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode",
"name": "Save Client Attributes",
"debugMode": false,
"configuration": {
"scope": "CLIENT_SCOPE"
}
},
{
"additionalInfo": {
"layoutX": 347,
"layoutY": 149
},
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode",
"name": "Message Type Switch",
"debugMode": false,
"configuration": {
"version": 0
}
},
{
"additionalInfo": {
"layoutX": 825,
"layoutY": 266
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log RPC from Device",
"debugMode": false,
"configuration": {
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
}
},
{
"additionalInfo": {
"layoutX": 824,
"layoutY": 378
},
"type": "org.thingsboard.rule.engine.action.TbLogNode",
"name": "Log Other",
"debugMode": false,
"configuration": {
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
}
},
{
"additionalInfo": {
"layoutX": 824,
"layoutY": 466
},
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode",
"name": "RPC Call Request",
"debugMode": false,
"configuration": {
"timeoutInSeconds": 60
}
},
{
"additionalInfo": {
"layoutX": 1129,
"layoutY": 52
},
"type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode",
"name": "Push to cloud",
"debugMode": false,
"configuration": {
"scope": "SERVER_SCOPE"
}
}
],
"connections": [
{
"fromIndex": 0,
"toIndex": 3,
"type": "Success"
},
{
"fromIndex": 1,
"toIndex": 7,
"type": "Success"
},
{
"fromIndex": 2,
"toIndex": 7,
"type": "Success"
},
{
"fromIndex": 3,
"toIndex": 6,
"type": "RPC Request to Device"
},
{
"fromIndex": 3,
"toIndex": 5,
"type": "Other"
},
{
"fromIndex": 3,
"toIndex": 2,
"type": "Post attributes"
},
{
"fromIndex": 3,
"toIndex": 1,
"type": "Post telemetry"
},
{
"fromIndex": 3,
"toIndex": 4,
"type": "RPC Request from Device"
},
{
"fromIndex": 3,
"toIndex": 7,
"type": "Attributes Updated"
},
{
"fromIndex": 4,
"toIndex": 7,
"type": "Success"
}
],
"ruleChainConnections": null
}
}

View File

@ -87,6 +87,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -114,7 +115,7 @@ public final class EdgeGrpcSession implements Closeable {
private final Consumer<EdgeId> sessionCloseListener; private final Consumer<EdgeId> sessionCloseListener;
private final ObjectMapper mapper; private final ObjectMapper mapper;
private final Map<Integer, DownlinkMsg> pendingMsgsMap = new HashMap<>(); private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>();
private EdgeContextComponent ctx; private EdgeContextComponent ctx;
private Edge edge; private Edge edge;
@ -347,7 +348,7 @@ public final class EdgeGrpcSession implements Closeable {
do { do {
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size()); log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size());
latch = new CountDownLatch(pendingMsgsMap.values().size()); latch = new CountDownLatch(pendingMsgsMap.values().size());
Collection<DownlinkMsg> copy = new ArrayList<>(pendingMsgsMap.values()); List<DownlinkMsg> copy = new ArrayList<>(pendingMsgsMap.values());
for (DownlinkMsg downlinkMsg : copy) { for (DownlinkMsg downlinkMsg : copy) {
sendDownlinkMsg(ResponseMsg.newBuilder() sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg) .setDownlinkMsg(downlinkMsg)

View File

@ -15,8 +15,10 @@
*/ */
package org.thingsboard.server.service.edge.rpc.processor; package org.thingsboard.server.service.edge.rpc.processor;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
@ -92,7 +94,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
pageLink = pageLink.nextPageLink(); pageLink = pageLink.nextPageLink();
} }
} }
Device newDevice;
if (update) { if (update) {
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " +
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
@ -101,16 +103,28 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." +
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
String newDeviceName = deviceUpdateMsg.getName() + "_" + RandomStringUtils.randomAlphabetic(15); String newDeviceName = deviceUpdateMsg.getName() + "_" + RandomStringUtils.randomAlphabetic(15);
newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName); Device newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName);
ObjectNode body = mapper.createObjectNode(); ObjectNode body = mapper.createObjectNode();
body.put("conflictName", deviceName); body.put("conflictName", deviceName);
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); ListenableFuture<EdgeEvent> future =
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(EdgeEvent edgeEvent) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null);
}
@Override
public void onFailure(Throwable t) {
log.error("[{}] Failed to save ENTITY_MERGE_REQUEST edge event [{}][{}]", tenantId, deviceUpdateMsg, edge.getId(), t);
}
}, dbCallbackExecutorService);
} }
} while (pageData != null && pageData.hasNext()); } while (pageData != null && pageData.hasNext());
} else { } else {
log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg); log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg);
device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName());
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, device.getId(), null); saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null);
} }
break; break;
case ENTITY_UPDATED_RPC_MESSAGE: case ENTITY_UPDATED_RPC_MESSAGE:
@ -179,8 +193,16 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
try { try {
deviceCreationLock.lock(); deviceCreationLock.lock();
log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
device = new Device(); DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
device.setTenantId(edge.getTenantId()); device = deviceService.findDeviceById(tenantId, deviceId);
boolean created = false;
if (device == null) {
device = new Device();
device.setTenantId(tenantId);
device.setId(deviceId);
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
created = true;
}
// make device private, if edge is public // make device private, if edge is public
device.setCustomerId(getCustomerId(edge)); device.setCustomerId(getCustomerId(edge));
device.setName(deviceName); device.setName(deviceName);
@ -192,9 +214,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB())); new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB()));
device.setDeviceProfileId(deviceProfileId); device.setDeviceProfileId(deviceProfileId);
} }
device = deviceService.saveDevice(device); Device savedDevice = deviceService.saveDevice(device, false);
if (created) {
DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
deviceCredentials.setCredentialsId(org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric(20));
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
deviceStateService.onDeviceAdded(savedDevice);
}
createRelationFromEdge(tenantId, edge.getId(), device.getId()); createRelationFromEdge(tenantId, edge.getId(), device.getId());
deviceStateService.onDeviceAdded(device);
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device); pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId()); deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
} finally { } finally {

View File

@ -177,9 +177,7 @@ public abstract class AbstractOAuth2ClientMapper {
tenant.setTitle(tenantName); tenant.setTitle(tenantName);
tenant = tenantService.saveTenant(tenant); tenant = tenantService.saveTenant(tenant);
installScripts.createDefaultRuleChains(tenant.getId()); installScripts.createDefaultRuleChains(tenant.getId());
if (edgesEnabled) { installScripts.createDefaultEdgeRuleChains(tenant.getId());
installScripts.createDefaultEdgeRuleChains(tenant.getId());
}
tenantProfileCache.evict(tenant.getId()); tenantProfileCache.evict(tenant.getId());
tbClusterService.onTenantChange(tenant, null); tbClusterService.onTenantChange(tenant, null);
tbClusterService.onEntityStateChange(tenant.getId(), tenant.getId(), tbClusterService.onEntityStateChange(tenant.getId(), tenant.getId(),

View File

@ -1161,11 +1161,12 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage(); AbstractMessage latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg);
DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage; DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage;
Assert.assertEquals("Edge Device 2", latestDeviceUpdateMsg.getName()); Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB()); UUID newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
Device device = doGet("/api/device/" + newDeviceId, Device.class); Device device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device); Assert.assertNotNull(device);
@ -1189,7 +1190,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build());
edgeImitator.expectResponsesAmount(1); edgeImitator.expectResponsesAmount(1);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(2);
testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
@ -1197,7 +1198,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Assert.assertTrue(edgeImitator.waitForResponses()); Assert.assertTrue(edgeImitator.waitForResponses());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
AbstractMessage latestMessage = edgeImitator.getLatestMessage(); AbstractMessage latestMessage = edgeImitator.getMessageFromTail(2);
Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg);
DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage; DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage;
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName()); Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
@ -1210,6 +1211,18 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Device device = doGet("/api/device/" + newDeviceId, Device.class); Device device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device); Assert.assertNotNull(device);
Assert.assertNotEquals(deviceOnCloudName, device.getName()); Assert.assertNotEquals(deviceOnCloudName, device.getName());
latestMessage = edgeImitator.getLatestMessage();
Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg);
DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage;
Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB());
Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB());
device = doGet("/api/device/" + newDeviceId, Device.class);
Assert.assertNotNull(device);
Assert.assertNotEquals(deviceOnCloudName, device.getName());
} }
private void sendRelationRequest() throws Exception { private void sendRelationRequest() throws Exception {

View File

@ -334,7 +334,11 @@ public class EdgeImitator {
} }
public AbstractMessage getLatestMessage() { public AbstractMessage getLatestMessage() {
return downlinkMsgs.get(downlinkMsgs.size() - 1); return getMessageFromTail(1);
}
public AbstractMessage getMessageFromTail(int offset) {
return downlinkMsgs.get(downlinkMsgs.size() - offset);
} }
public void ignoreType(Class<? extends AbstractMessage> type) { public void ignoreType(Class<? extends AbstractMessage> type) {

View File

@ -46,6 +46,8 @@ public interface DeviceService {
Device findDeviceByTenantIdAndName(TenantId tenantId, String name); Device findDeviceByTenantIdAndName(TenantId tenantId, String name);
Device saveDevice(Device device, boolean doValidate);
Device saveDevice(Device device); Device saveDevice(Device device);
Device saveDeviceWithAccessToken(Device device, String accessToken); Device saveDeviceWithAccessToken(Device device, String accessToken);

View File

@ -181,13 +181,19 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
@CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
@Override @Override
public Device saveDeviceWithAccessToken(Device device, String accessToken) { public Device saveDeviceWithAccessToken(Device device, String accessToken) {
return doSaveDevice(device, accessToken); return doSaveDevice(device, accessToken, true);
}
@CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
@Override
public Device saveDevice(Device device, boolean doValidate) {
return doSaveDevice(device, null, doValidate);
} }
@CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
@Override @Override
public Device saveDevice(Device device) { public Device saveDevice(Device device) {
return doSaveDevice(device, null); return doSaveDevice(device, null, true);
} }
@CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}")
@ -197,7 +203,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
Device deviceWithName = this.findDeviceByTenantIdAndName(device.getTenantId(), device.getName()); Device deviceWithName = this.findDeviceByTenantIdAndName(device.getTenantId(), device.getName());
device = deviceWithName == null ? device : deviceWithName.updateDevice(device); device = deviceWithName == null ? device : deviceWithName.updateDevice(device);
} }
Device savedDevice = this.saveDeviceWithoutCredentials(device); Device savedDevice = this.saveDeviceWithoutCredentials(device, true);
deviceCredentials.setDeviceId(savedDevice.getId()); deviceCredentials.setDeviceId(savedDevice.getId());
if (device.getId() == null) { if (device.getId() == null) {
deviceCredentialsService.createDeviceCredentials(savedDevice.getTenantId(), deviceCredentials); deviceCredentialsService.createDeviceCredentials(savedDevice.getTenantId(), deviceCredentials);
@ -212,8 +218,8 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
return savedDevice; return savedDevice;
} }
private Device doSaveDevice(Device device, String accessToken) { private Device doSaveDevice(Device device, String accessToken, boolean doValidate) {
Device savedDevice = this.saveDeviceWithoutCredentials(device); Device savedDevice = this.saveDeviceWithoutCredentials(device, doValidate);
if (device.getId() == null) { if (device.getId() == null) {
DeviceCredentials deviceCredentials = new DeviceCredentials(); DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
@ -224,9 +230,11 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
return savedDevice; return savedDevice;
} }
private Device saveDeviceWithoutCredentials(Device device) { private Device saveDeviceWithoutCredentials(Device device, boolean doValidate) {
log.trace("Executing saveDevice [{}]", device); log.trace("Executing saveDevice [{}]", device);
deviceValidator.validate(device, Device::getTenantId); if (doValidate) {
deviceValidator.validate(device, Device::getTenantId);
}
try { try {
DeviceProfile deviceProfile; DeviceProfile deviceProfile;
if (device.getDeviceProfileId() == null) { if (device.getDeviceProfileId() == null) {
@ -542,7 +550,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
device.setTenantId(tenantId); device.setTenantId(tenantId);
device.setCustomerId(null); device.setCustomerId(null);
return doSaveDevice(device, null); return doSaveDevice(device, null, true);
} }
@Override @Override