diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 38239c9a8b..817abe4018 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -74,6 +74,7 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.device.DeviceBulkImportService; +import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.importing.BulkImportRequest; import org.thingsboard.server.service.importing.BulkImportResult; import org.thingsboard.server.service.security.model.SecurityUser; @@ -128,6 +129,8 @@ public class DeviceController extends BaseController { private final DeviceBulkImportService deviceBulkImportService; + private final GatewayNotificationsService gatewayNotificationsService; + @ApiOperation(value = "Get Device (getDeviceById)", notes = "Fetch the Device object based on the provided Device Id. " + "If the user has the authority of 'TENANT_ADMIN', the server checks that the device is owned by the same tenant. " + @@ -263,6 +266,7 @@ public class DeviceController extends BaseController { deviceService.deleteDevice(getCurrentUser().getTenantId(), deviceId); + gatewayNotificationsService.onDeviceDeleted(device); tbClusterService.onDeviceDeleted(device, null); logEntityAction(deviceId, device, diff --git a/application/src/main/java/org/thingsboard/server/service/gateway_device/DefaultGatewayNotificationsService.java b/application/src/main/java/org/thingsboard/server/service/gateway_device/DefaultGatewayNotificationsService.java new file mode 100644 index 0000000000..1038bb342a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/gateway_device/DefaultGatewayNotificationsService.java @@ -0,0 +1,106 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.gateway_device; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; + +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@Slf4j +@Service +@RequiredArgsConstructor +public class DefaultGatewayNotificationsService implements GatewayNotificationsService { + + private final static String DEVICE_RENAMED_METHOD_NAME = "gateway_device_renamed"; + private final static String DEVICE_DELETED_METHOD_NAME = "gateway_device_deleted"; + private final static Long rpcTimeout = TimeUnit.DAYS.toMillis(1); + @Lazy + @Autowired + private TbCoreDeviceRpcService deviceRpcService; + + @Override + public void onDeviceUpdated(Device device, Device oldDevice) { + Optional gatewayDeviceId = getGatewayDeviceIdFromAdditionalInfoInDevice(device); + if (gatewayDeviceId.isPresent()) { + ObjectNode renamedDeviceNode = JacksonUtil.newObjectNode(); + renamedDeviceNode.put(oldDevice.getName(), device.getName()); + ToDeviceRpcRequest rpcRequest = formDeviceToGatewayRPCRequest(device.getTenantId(), gatewayDeviceId.get(), renamedDeviceNode, DEVICE_RENAMED_METHOD_NAME); + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> { + log.trace("Device renamed RPC with id: [{}] processed to gateway device with id: [{}], old device name: [{}], new device name: [{}]", + rpcRequest.getId(), gatewayDeviceId, oldDevice.getName(), device.getName()); + }, null); + } + } + + @Override + public void onDeviceDeleted(Device device) { + Optional gatewayDeviceId = getGatewayDeviceIdFromAdditionalInfoInDevice(device); + if (gatewayDeviceId.isPresent()) { + TextNode deletedDeviceNode = new TextNode(device.getName()); + ToDeviceRpcRequest rpcRequest = formDeviceToGatewayRPCRequest(device.getTenantId(), gatewayDeviceId.get(), deletedDeviceNode, DEVICE_DELETED_METHOD_NAME); + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> { + log.trace("Device deleted RPC with id: [{}] processed to gateway device with id: [{}], deleted device name: [{}]", + rpcRequest.getId(), gatewayDeviceId, device.getName()); + }, null); + } + } + + private ToDeviceRpcRequest formDeviceToGatewayRPCRequest(TenantId tenantId, DeviceId gatewayDeviceId, JsonNode deviceDataNode, String method) { + ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(method, JacksonUtil.toString(deviceDataNode)); + long expTime = System.currentTimeMillis() + rpcTimeout; + UUID rpcRequestUUID = UUID.randomUUID(); + return new ToDeviceRpcRequest(rpcRequestUUID, + tenantId, + gatewayDeviceId, + true, + expTime, + body, + true, + 3, + null + ); + } + + private Optional getGatewayDeviceIdFromAdditionalInfoInDevice(Device device) { + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo != null && deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)) { + try { + JsonNode lastConnectedGatewayIdNode = deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY); + return Optional.of(new DeviceId(UUID.fromString(lastConnectedGatewayIdNode.asText()))); + } catch (RuntimeException e) { + log.debug("[{}] Failed to decode connected gateway: {}", device.getId(), deviceAdditionalInfo); + } + } + return Optional.empty(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/gateway_device/GatewayNotificationsService.java b/application/src/main/java/org/thingsboard/server/service/gateway_device/GatewayNotificationsService.java new file mode 100644 index 0000000000..4d7bf36caf --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/gateway_device/GatewayNotificationsService.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.gateway_device; + +import org.thingsboard.server.common.data.Device; + +public interface GatewayNotificationsService { + + void onDeviceUpdated(Device device, Device oldDevice); + + void onDeviceDeleted(Device device); +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 834a59811f..0d8c65dc39 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -61,6 +61,7 @@ import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; @@ -91,6 +92,7 @@ public class DefaultTbClusterService implements TbClusterService { private final DataDecodingEncodingService encodingService; private final TbDeviceProfileCache deviceProfileCache; private final OtaPackageStateService otaPackageStateService; + private final GatewayNotificationsService gatewayNotificationsService; @Override public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) { @@ -399,8 +401,14 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceUpdated(Device device, Device old, boolean notifyEdge) { var created = old == null; broadcastEntityChangeToTransport(device.getTenantId(), device.getId(), device, null); - if (old != null && (!device.getName().equals(old.getName()) || !device.getType().equals(old.getType()))) { - pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null); + if (old != null) { + boolean deviceNameChanged = !device.getName().equals(old.getName()); + if (deviceNameChanged) { + gatewayNotificationsService.onDeviceUpdated(device, old); + } + if (deviceNameChanged || !device.getType().equals(old.getType())) { + pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null); + } } broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 2b2ade02ee..99e013d6d0 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -22,10 +22,12 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -34,7 +36,6 @@ import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.security.model.SecurityUser; import javax.annotation.PostConstruct; @@ -183,7 +184,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { entityNode.put(DataConstants.ADDITIONAL_INFO, msg.getAdditionalInfo()); try { - TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), currentUser.getCustomerId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); + TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), Optional.ofNullable(currentUser).map(User::getCustomerId).orElse(null), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 80fb062c90..864ad3a4b8 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.transport; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; @@ -27,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cache.ota.OtaPackageDataCache; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -95,9 +97,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.resource.TbResourceService; -import org.thingsboard.server.service.state.DeviceStateService; import java.util.Optional; import java.util.UUID; @@ -127,7 +127,6 @@ public class DefaultTransportApiService implements TransportApiService { private final DeviceService deviceService; private final RelationService relationService; private final DeviceCredentialsService deviceCredentialsService; - private final DeviceStateService deviceStateService; private final DbCallbackExecutorService dbCallbackExecutorService; private final TbClusterService tbClusterService; private final DataDecodingEncodingService dataDecodingEncodingService; @@ -138,6 +137,10 @@ public class DefaultTransportApiService implements TransportApiService { private final ConcurrentMap deviceCreationLocks = new ConcurrentHashMap<>(); + private static boolean checkIsMqttCredentials(DeviceCredentials credentials) { + return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType()); + } + @Override public ListenableFuture> handle(TbProtoQueueMsg tbProtoQueueMsg) { TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue(); @@ -234,10 +237,6 @@ public class DefaultTransportApiService implements TransportApiService { return getEmptyTransportApiResponseFuture(); } - private static boolean checkIsMqttCredentials(DeviceCredentials credentials) { - return credentials != null && DeviceCredentialsType.MQTT_BASIC.equals(credentials.getCredentialsType()); - } - private DeviceCredentials checkMqttCredentials(TransportProtos.ValidateBasicMqttCredRequestMsg clientCred, String credId) { return checkMqttCredentials(clientCred, deviceCredentialsService.findDeviceCredentialsByCredentialsId(credId)); } @@ -286,6 +285,9 @@ public class DefaultTransportApiService implements TransportApiService { device.setCustomerId(gateway.getCustomerId()); DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType()); device.setDeviceProfileId(deviceProfile.getId()); + ObjectNode additionalInfo = JacksonUtil.newObjectNode(); + additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); + device.setAdditionalInfo(additionalInfo); Device savedDevice = deviceService.saveDevice(device); tbClusterService.onDeviceUpdated(savedDevice, null); device = savedDevice; @@ -303,6 +305,19 @@ public class DefaultTransportApiService implements TransportApiService { ObjectNode entityNode = mapper.valueToTree(device); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); + } else { + JsonNode deviceAdditionalInfo = device.getAdditionalInfo(); + if (deviceAdditionalInfo == null) { + deviceAdditionalInfo = JacksonUtil.newObjectNode(); + } + if (deviceAdditionalInfo.isObject() && + (!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY) + || !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) { + ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo; + newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString()); + Device savedDevice = deviceService.saveDevice(device); + tbClusterService.onDeviceUpdated(savedDevice, device); + } } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() .setDeviceInfo(getDeviceInfoProto(device)); @@ -348,7 +363,8 @@ public class DefaultTransportApiService implements TransportApiService { dbCallbackExecutorService); } - private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials deviceCredentials, TransportProtos.ResponseStatus status) { + private TransportApiResponseMsg getTransportApiResponseMsg(DeviceCredentials + deviceCredentials, TransportProtos.ResponseStatus status) { if (!status.equals(TransportProtos.ResponseStatus.SUCCESS)) { return TransportApiResponseMsg.newBuilder().setProvisionDeviceResponseMsg(TransportProtos.ProvisionDeviceResponseMsg.newBuilder().setStatus(status).build()).build(); } @@ -591,7 +607,8 @@ public class DefaultTransportApiService implements TransportApiService { .build()); } - private ListenableFuture handleRegistration(TransportProtos.LwM2MRegistrationRequestMsg msg) { + private ListenableFuture handleRegistration + (TransportProtos.LwM2MRegistrationRequestMsg msg) { TenantId tenantId = TenantId.fromUUID(UUID.fromString(msg.getTenantId())); String deviceName = msg.getEndpoint(); Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(deviceName, id -> new ReentrantLock()); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 707a50dfb3..40f6c13794 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -116,4 +116,5 @@ public class DataConstants { public static final String EDGE_MSG_SOURCE = "edge"; public static final String MSG_SOURCE_KEY = "source"; + public static final String LAST_CONNECTED_GATEWAY = "lastConnectedGateway"; }