diff --git a/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json b/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json deleted file mode 100644 index a065279cce..0000000000 --- a/application/src/main/data/json/demo/edge_management/rule_chains/edge_root_rule_chain.json +++ /dev/null @@ -1,167 +0,0 @@ -{ - "ruleChain": { - "additionalInfo": null, - "name": "Edge Root Rule Chain", - "type": "EDGE", - "firstRuleNodeId": null, - "root": true, - "debugMode": false, - "configuration": null - }, - "metadata": { - "firstNodeIndex": 0, - "nodes": [ - { - "additionalInfo": { - "description": "Process incoming messages from devices with the alarm rules defined in the device profile. Dispatch all incoming messages with \"Success\" relation type.", - "layoutX": 187, - "layoutY": 468 - }, - "type": "org.thingsboard.rule.engine.profile.TbDeviceProfileNode", - "name": "Device Profile Node", - "debugMode": false, - "configuration": { - "persistAlarmRulesState": false, - "fetchAlarmRulesStateOnStart": false - } - }, - { - "additionalInfo": { - "layoutX": 823, - "layoutY": 157 - }, - "type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", - "name": "Save Timeseries", - "debugMode": false, - "configuration": { - "defaultTTL": 0 - } - }, - { - "additionalInfo": { - "layoutX": 824, - "layoutY": 52 - }, - "type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", - "name": "Save Client Attributes", - "debugMode": false, - "configuration": { - "scope": "CLIENT_SCOPE" - } - }, - { - "additionalInfo": { - "layoutX": 347, - "layoutY": 149 - }, - "type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", - "name": "Message Type Switch", - "debugMode": false, - "configuration": { - "version": 0 - } - }, - { - "additionalInfo": { - "layoutX": 825, - "layoutY": 266 - }, - "type": "org.thingsboard.rule.engine.action.TbLogNode", - "name": "Log RPC from Device", - "debugMode": false, - "configuration": { - "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" - } - }, - { - "additionalInfo": { - "layoutX": 824, - "layoutY": 378 - }, - "type": "org.thingsboard.rule.engine.action.TbLogNode", - "name": "Log Other", - "debugMode": false, - "configuration": { - "jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" - } - }, - { - "additionalInfo": { - "layoutX": 824, - "layoutY": 466 - }, - "type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", - "name": "RPC Call Request", - "debugMode": false, - "configuration": { - "timeoutInSeconds": 60 - } - }, - { - "additionalInfo": { - "layoutX": 1129, - "layoutY": 52 - }, - "type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", - "name": "Push to cloud", - "debugMode": false, - "configuration": { - "scope": "SERVER_SCOPE" - } - } - ], - "connections": [ - { - "fromIndex": 0, - "toIndex": 3, - "type": "Success" - }, - { - "fromIndex": 1, - "toIndex": 7, - "type": "Success" - }, - { - "fromIndex": 2, - "toIndex": 7, - "type": "Success" - }, - { - "fromIndex": 3, - "toIndex": 6, - "type": "RPC Request to Device" - }, - { - "fromIndex": 3, - "toIndex": 5, - "type": "Other" - }, - { - "fromIndex": 3, - "toIndex": 2, - "type": "Post attributes" - }, - { - "fromIndex": 3, - "toIndex": 1, - "type": "Post telemetry" - }, - { - "fromIndex": 3, - "toIndex": 4, - "type": "RPC Request from Device" - }, - { - "fromIndex": 3, - "toIndex": 7, - "type": "Attributes Updated" - }, - { - "fromIndex": 4, - "toIndex": 7, - "type": "Success" - } - ], - "ruleChainConnections": null - } -} \ No newline at end of file diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 15f591694a..66e6acca34 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -87,6 +87,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -114,7 +115,7 @@ public final class EdgeGrpcSession implements Closeable { private final Consumer sessionCloseListener; private final ObjectMapper mapper; - private final Map pendingMsgsMap = new HashMap<>(); + private final Map pendingMsgsMap = new LinkedHashMap<>(); private EdgeContextComponent ctx; private Edge edge; @@ -347,7 +348,7 @@ public final class EdgeGrpcSession implements Closeable { do { log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size()); latch = new CountDownLatch(pendingMsgsMap.values().size()); - Collection copy = new ArrayList<>(pendingMsgsMap.values()); + List copy = new ArrayList<>(pendingMsgsMap.values()); for (DownlinkMsg downlinkMsg : copy) { sendDownlinkMsg(ResponseMsg.newBuilder() .setDownlinkMsg(downlinkMsg) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java index 013e50482b..eb569db885 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceEdgeProcessor.java @@ -15,8 +15,10 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -92,7 +94,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { pageLink = pageLink.nextPageLink(); } } - Device newDevice; + if (update) { log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " + "deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg); @@ -101,16 +103,28 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." + "Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg); String newDeviceName = deviceUpdateMsg.getName() + "_" + RandomStringUtils.randomAlphabetic(15); - newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName); + Device newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName); ObjectNode body = mapper.createObjectNode(); body.put("conflictName", deviceName); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); + ListenableFuture future = + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body); + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(EdgeEvent edgeEvent) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null); + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Failed to save ENTITY_MERGE_REQUEST edge event [{}][{}]", tenantId, deviceUpdateMsg, edge.getId(), t); + } + }, dbCallbackExecutorService); } } while (pageData != null && pageData.hasNext()); } else { log.info("[{}] Creating new device and replacing device entity on the edge [{}]", tenantId, deviceUpdateMsg); device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName()); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, device.getId(), null); + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null); } break; case ENTITY_UPDATED_RPC_MESSAGE: @@ -179,8 +193,16 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { try { deviceCreationLock.lock(); log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); - device = new Device(); - device.setTenantId(edge.getTenantId()); + DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB())); + device = deviceService.findDeviceById(tenantId, deviceId); + boolean created = false; + if (device == null) { + device = new Device(); + device.setTenantId(tenantId); + device.setId(deviceId); + device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId())); + created = true; + } // make device private, if edge is public device.setCustomerId(getCustomerId(edge)); device.setName(deviceName); @@ -192,9 +214,17 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor { new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB())); device.setDeviceProfileId(deviceProfileId); } - device = deviceService.saveDevice(device); + Device savedDevice = deviceService.saveDevice(device, false); + if (created) { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsId(org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric(20)); + deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials); + + deviceStateService.onDeviceAdded(savedDevice); + } createRelationFromEdge(tenantId, edge.getId(), device.getId()); - deviceStateService.onDeviceAdded(device); pushDeviceCreatedEventToRuleEngine(tenantId, edge, device); deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId()); } finally { diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java index 4f38cf7a23..17f76127f6 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java @@ -177,9 +177,7 @@ public abstract class AbstractOAuth2ClientMapper { tenant.setTitle(tenantName); tenant = tenantService.saveTenant(tenant); installScripts.createDefaultRuleChains(tenant.getId()); - if (edgesEnabled) { - installScripts.createDefaultEdgeRuleChains(tenant.getId()); - } + installScripts.createDefaultEdgeRuleChains(tenant.getId()); tenantProfileCache.evict(tenant.getId()); tbClusterService.onTenantChange(tenant, null); tbClusterService.onEntityStateChange(tenant.getId(), tenant.getId(), diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 0e2f94eaf9..788a66acc0 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -1161,11 +1161,12 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertTrue(edgeImitator.waitForMessages()); AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); - DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage; - Assert.assertEquals("Edge Device 2", latestDeviceUpdateMsg.getName()); + Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg); + DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage; + Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); + Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); - UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB()); + UUID newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); Device device = doGet("/api/device/" + newDeviceId, Device.class); Assert.assertNotNull(device); @@ -1189,7 +1190,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { uplinkMsgBuilder.addDeviceUpdateMsg(deviceUpdateMsgBuilder.build()); edgeImitator.expectResponsesAmount(1); - edgeImitator.expectMessageAmount(1); + edgeImitator.expectMessageAmount(2); testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); @@ -1197,7 +1198,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertTrue(edgeImitator.waitForResponses()); Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getMessageFromTail(2); Assert.assertTrue(latestMessage instanceof DeviceUpdateMsg); DeviceUpdateMsg latestDeviceUpdateMsg = (DeviceUpdateMsg) latestMessage; Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName()); @@ -1210,6 +1211,18 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Device device = doGet("/api/device/" + newDeviceId, Device.class); Assert.assertNotNull(device); Assert.assertNotEquals(deviceOnCloudName, device.getName()); + + latestMessage = edgeImitator.getLatestMessage(); + Assert.assertTrue(latestMessage instanceof DeviceCredentialsRequestMsg); + DeviceCredentialsRequestMsg latestDeviceCredentialsRequestMsg = (DeviceCredentialsRequestMsg) latestMessage; + Assert.assertEquals(uuid.getMostSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdMSB()); + Assert.assertEquals(uuid.getLeastSignificantBits(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); + + newDeviceId = new UUID(latestDeviceCredentialsRequestMsg.getDeviceIdMSB(), latestDeviceCredentialsRequestMsg.getDeviceIdLSB()); + + device = doGet("/api/device/" + newDeviceId, Device.class); + Assert.assertNotNull(device); + Assert.assertNotEquals(deviceOnCloudName, device.getName()); } private void sendRelationRequest() throws Exception { diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index 4988f7d1cf..64d702501a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -334,7 +334,11 @@ public class EdgeImitator { } public AbstractMessage getLatestMessage() { - return downlinkMsgs.get(downlinkMsgs.size() - 1); + return getMessageFromTail(1); + } + + public AbstractMessage getMessageFromTail(int offset) { + return downlinkMsgs.get(downlinkMsgs.size() - offset); } public void ignoreType(Class type) { diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index 761211f32c..162959fa5a 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -46,6 +46,8 @@ public interface DeviceService { Device findDeviceByTenantIdAndName(TenantId tenantId, String name); + Device saveDevice(Device device, boolean doValidate); + Device saveDevice(Device device); Device saveDeviceWithAccessToken(Device device, String accessToken); diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 34aa462a6d..1081ae4c05 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -181,13 +181,19 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @Override public Device saveDeviceWithAccessToken(Device device, String accessToken) { - return doSaveDevice(device, accessToken); + return doSaveDevice(device, accessToken, true); + } + + @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") + @Override + public Device saveDevice(Device device, boolean doValidate) { + return doSaveDevice(device, null, doValidate); } @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @Override public Device saveDevice(Device device) { - return doSaveDevice(device, null); + return doSaveDevice(device, null, true); } @CacheEvict(cacheNames = DEVICE_CACHE, key = "{#device.tenantId, #device.name}") @@ -197,7 +203,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe Device deviceWithName = this.findDeviceByTenantIdAndName(device.getTenantId(), device.getName()); device = deviceWithName == null ? device : deviceWithName.updateDevice(device); } - Device savedDevice = this.saveDeviceWithoutCredentials(device); + Device savedDevice = this.saveDeviceWithoutCredentials(device, true); deviceCredentials.setDeviceId(savedDevice.getId()); if (device.getId() == null) { deviceCredentialsService.createDeviceCredentials(savedDevice.getTenantId(), deviceCredentials); @@ -212,8 +218,8 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe return savedDevice; } - private Device doSaveDevice(Device device, String accessToken) { - Device savedDevice = this.saveDeviceWithoutCredentials(device); + private Device doSaveDevice(Device device, String accessToken, boolean doValidate) { + Device savedDevice = this.saveDeviceWithoutCredentials(device, doValidate); if (device.getId() == null) { DeviceCredentials deviceCredentials = new DeviceCredentials(); deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId())); @@ -224,9 +230,11 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe return savedDevice; } - private Device saveDeviceWithoutCredentials(Device device) { + private Device saveDeviceWithoutCredentials(Device device, boolean doValidate) { log.trace("Executing saveDevice [{}]", device); - deviceValidator.validate(device, Device::getTenantId); + if (doValidate) { + deviceValidator.validate(device, Device::getTenantId); + } try { DeviceProfile deviceProfile; if (device.getDeviceProfileId() == null) { @@ -542,7 +550,7 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe device.setTenantId(tenantId); device.setCustomerId(null); - return doSaveDevice(device, null); + return doSaveDevice(device, null, true); } @Override