Merge pull request #5796 from desoliture1/gatewayFix
[WIP][3.3.3] MQTT Gateway API - fix response data on attributes request
This commit is contained in:
		
						commit
						a3f3578b65
					
				@ -452,6 +452,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 | 
				
			|||||||
                            .setRequestId(requestId)
 | 
					                            .setRequestId(requestId)
 | 
				
			||||||
                            .setSharedStateMsg(true)
 | 
					                            .setSharedStateMsg(true)
 | 
				
			||||||
                            .addAllSharedAttributeList(toTsKvProtos(result))
 | 
					                            .addAllSharedAttributeList(toTsKvProtos(result))
 | 
				
			||||||
 | 
					                            .setIsMultipleAttributesRequest(request.getSharedAttributeNamesCount() > 1)
 | 
				
			||||||
                            .build();
 | 
					                            .build();
 | 
				
			||||||
                    sendToTransport(responseMsg, sessionInfo);
 | 
					                    sendToTransport(responseMsg, sessionInfo);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
@ -473,6 +474,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
 | 
				
			|||||||
                            .setRequestId(requestId)
 | 
					                            .setRequestId(requestId)
 | 
				
			||||||
                            .addAllClientAttributeList(toTsKvProtos(result.get(0)))
 | 
					                            .addAllClientAttributeList(toTsKvProtos(result.get(0)))
 | 
				
			||||||
                            .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
 | 
					                            .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
 | 
				
			||||||
 | 
					                            .setIsMultipleAttributesRequest(
 | 
				
			||||||
 | 
					                                    request.getSharedAttributeNamesCount() + request.getClientAttributeNamesCount() > 1)
 | 
				
			||||||
                            .build();
 | 
					                            .build();
 | 
				
			||||||
                    sendToTransport(responseMsg, sessionInfo);
 | 
					                    sendToTransport(responseMsg, sessionInfo);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
				
			|||||||
@ -149,6 +149,7 @@ message GetAttributeResponseMsg {
 | 
				
			|||||||
  int32 requestId = 1;
 | 
					  int32 requestId = 1;
 | 
				
			||||||
  repeated TsKvProto clientAttributeList = 2;
 | 
					  repeated TsKvProto clientAttributeList = 2;
 | 
				
			||||||
  repeated TsKvProto sharedAttributeList = 3;
 | 
					  repeated TsKvProto sharedAttributeList = 3;
 | 
				
			||||||
 | 
					  bool isMultipleAttributesRequest = 4;
 | 
				
			||||||
  string error = 5;
 | 
					  string error = 5;
 | 
				
			||||||
  bool sharedStateMsg = 6;
 | 
					  bool sharedStateMsg = 6;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -313,16 +313,18 @@ public class JsonConverter {
 | 
				
			|||||||
        return result;
 | 
					        return result;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public static JsonObject getJsonObjectForGateway(String deviceName, TransportProtos.GetAttributeResponseMsg
 | 
					    public static JsonObject getJsonObjectForGateway(
 | 
				
			||||||
            responseMsg) {
 | 
					            String deviceName,
 | 
				
			||||||
 | 
					            TransportProtos.GetAttributeResponseMsg responseMsg
 | 
				
			||||||
 | 
					    ) {
 | 
				
			||||||
        JsonObject result = new JsonObject();
 | 
					        JsonObject result = new JsonObject();
 | 
				
			||||||
        result.addProperty("id", responseMsg.getRequestId());
 | 
					        result.addProperty("id", responseMsg.getRequestId());
 | 
				
			||||||
        result.addProperty(DEVICE_PROPERTY, deviceName);
 | 
					        result.addProperty(DEVICE_PROPERTY, deviceName);
 | 
				
			||||||
        if (responseMsg.getClientAttributeListCount() > 0) {
 | 
					        if (responseMsg.getClientAttributeListCount() > 0) {
 | 
				
			||||||
            addValues(result, responseMsg.getClientAttributeListList());
 | 
					            addValues(result, responseMsg.getClientAttributeListList(), responseMsg.getIsMultipleAttributesRequest());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (responseMsg.getSharedAttributeListCount() > 0) {
 | 
					        if (responseMsg.getSharedAttributeListCount() > 0) {
 | 
				
			||||||
            addValues(result, responseMsg.getSharedAttributeListList());
 | 
					            addValues(result, responseMsg.getSharedAttributeListList(), responseMsg.getIsMultipleAttributesRequest());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        return result;
 | 
					        return result;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -335,8 +337,8 @@ public class JsonConverter {
 | 
				
			|||||||
        return result;
 | 
					        return result;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private static void addValues(JsonObject result, List<TransportProtos.TsKvProto> kvList) {
 | 
					    private static void addValues(JsonObject result, List<TransportProtos.TsKvProto> kvList, boolean multipleAttrKeysRequested) {
 | 
				
			||||||
        if (kvList.size() == 1) {
 | 
					        if (kvList.size() == 1 && !multipleAttrKeysRequested) {
 | 
				
			||||||
            addValueToJson(result, "value", kvList.get(0).getKv());
 | 
					            addValueToJson(result, "value", kvList.get(0).getKv());
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            JsonObject values;
 | 
					            JsonObject values;
 | 
				
			||||||
 | 
				
			|||||||
@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 | 
				
			|||||||
import com.google.common.collect.ImmutableMap;
 | 
					import com.google.common.collect.ImmutableMap;
 | 
				
			||||||
import com.google.gson.JsonArray;
 | 
					import com.google.gson.JsonArray;
 | 
				
			||||||
import com.google.gson.JsonObject;
 | 
					import com.google.gson.JsonObject;
 | 
				
			||||||
 | 
					import com.google.gson.JsonParser;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					import lombok.extern.slf4j.Slf4j;
 | 
				
			||||||
import org.apache.cassandra.cql3.Json;
 | 
					import org.apache.cassandra.cql3.Json;
 | 
				
			||||||
import org.apache.commons.lang3.RandomStringUtils;
 | 
					import org.apache.commons.lang3.RandomStringUtils;
 | 
				
			||||||
@ -69,6 +70,7 @@ public abstract class AbstractContainerTest {
 | 
				
			|||||||
    protected static String TB_TOKEN;
 | 
					    protected static String TB_TOKEN;
 | 
				
			||||||
    protected static RestClient restClient;
 | 
					    protected static RestClient restClient;
 | 
				
			||||||
    protected ObjectMapper mapper = new ObjectMapper();
 | 
					    protected ObjectMapper mapper = new ObjectMapper();
 | 
				
			||||||
 | 
					    protected JsonParser jsonParser = new JsonParser();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @BeforeClass
 | 
					    @BeforeClass
 | 
				
			||||||
    public static void before() throws Exception {
 | 
					    public static void before() throws Exception {
 | 
				
			||||||
 | 
				
			|||||||
@ -20,6 +20,7 @@ import com.google.common.collect.Sets;
 | 
				
			|||||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
					import com.google.common.util.concurrent.ListenableFuture;
 | 
				
			||||||
import com.google.common.util.concurrent.ListeningExecutorService;
 | 
					import com.google.common.util.concurrent.ListeningExecutorService;
 | 
				
			||||||
import com.google.common.util.concurrent.MoreExecutors;
 | 
					import com.google.common.util.concurrent.MoreExecutors;
 | 
				
			||||||
 | 
					import com.google.gson.JsonArray;
 | 
				
			||||||
import com.google.gson.JsonObject;
 | 
					import com.google.gson.JsonObject;
 | 
				
			||||||
import com.google.gson.JsonParser;
 | 
					import com.google.gson.JsonParser;
 | 
				
			||||||
import io.netty.buffer.ByteBuf;
 | 
					import io.netty.buffer.ByteBuf;
 | 
				
			||||||
@ -151,6 +152,75 @@ public class MqttGatewayClientTest extends AbstractContainerTest {
 | 
				
			|||||||
        Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
 | 
					        Assert.assertTrue(verify(actualLatestTelemetry, "attr4", Long.toString(73)));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    public void responseDataOnAttributesRequestCheck() throws Exception {
 | 
				
			||||||
 | 
					        Optional<DeviceCredentials> createdDeviceCredentials = restClient.getDeviceCredentialsByDeviceId(createdDevice.getId());
 | 
				
			||||||
 | 
					        Assert.assertTrue(createdDeviceCredentials.isPresent());
 | 
				
			||||||
 | 
					        JsonObject sharedAttributes = new JsonObject();
 | 
				
			||||||
 | 
					        sharedAttributes.addProperty("attr1", "value1");
 | 
				
			||||||
 | 
					        sharedAttributes.addProperty("attr2", true);
 | 
				
			||||||
 | 
					        sharedAttributes.addProperty("attr3", 42.0);
 | 
				
			||||||
 | 
					        sharedAttributes.addProperty("attr4", 73);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
 | 
				
			||||||
 | 
					        ResponseEntity sharedAttributesResponse = restClient.getRestTemplate()
 | 
				
			||||||
 | 
					                .postForEntity(HTTPS_URL + "/api/plugins/telemetry/DEVICE/{deviceId}/SHARED_SCOPE",
 | 
				
			||||||
 | 
					                        mapper.readTree(sharedAttributes.toString()), ResponseEntity.class,
 | 
				
			||||||
 | 
					                        createdDevice.getId());
 | 
				
			||||||
 | 
					        Assert.assertTrue(sharedAttributesResponse.getStatusCode().is2xxSuccessful());
 | 
				
			||||||
 | 
					        var event = listener.getEvents().poll(10, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        JsonObject requestData = new JsonObject();
 | 
				
			||||||
 | 
					        requestData.addProperty("id", 1);
 | 
				
			||||||
 | 
					        requestData.addProperty("device", createdDevice.getName());
 | 
				
			||||||
 | 
					        requestData.addProperty("client", false);
 | 
				
			||||||
 | 
					        requestData.addProperty("key", "attr1");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
 | 
				
			||||||
 | 
					        mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
 | 
				
			||||||
 | 
					        event = listener.getEvents().poll(10, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        JsonObject responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
 | 
				
			||||||
 | 
					        Assert.assertTrue(responseData.has("value"));
 | 
				
			||||||
 | 
					        Assert.assertEquals(sharedAttributes.get("attr1").getAsString(), responseData.get("value").getAsString());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        requestData = new JsonObject();
 | 
				
			||||||
 | 
					        requestData.addProperty("id", 1);
 | 
				
			||||||
 | 
					        requestData.addProperty("device", createdDevice.getName());
 | 
				
			||||||
 | 
					        requestData.addProperty("client", false);
 | 
				
			||||||
 | 
					        JsonArray keys = new JsonArray();
 | 
				
			||||||
 | 
					        keys.add("attr1");
 | 
				
			||||||
 | 
					        keys.add("attr2");
 | 
				
			||||||
 | 
					        requestData.add("keys", keys);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
 | 
				
			||||||
 | 
					        mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
 | 
				
			||||||
 | 
					        event = listener.getEvents().poll(10, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					        responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Assert.assertTrue(responseData.has("values"));
 | 
				
			||||||
 | 
					        Assert.assertEquals(sharedAttributes.get("attr1").getAsString(), responseData.get("values").getAsJsonObject().get("attr1").getAsString());
 | 
				
			||||||
 | 
					        Assert.assertEquals(sharedAttributes.get("attr2").getAsString(), responseData.get("values").getAsJsonObject().get("attr2").getAsString());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        requestData = new JsonObject();
 | 
				
			||||||
 | 
					        requestData.addProperty("id", 1);
 | 
				
			||||||
 | 
					        requestData.addProperty("device", createdDevice.getName());
 | 
				
			||||||
 | 
					        requestData.addProperty("client", false);
 | 
				
			||||||
 | 
					        keys = new JsonArray();
 | 
				
			||||||
 | 
					        keys.add("attr1");
 | 
				
			||||||
 | 
					        keys.add("undefined");
 | 
				
			||||||
 | 
					        requestData.add("keys", keys);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        mqttClient.on("v1/gateway/attributes/response", listener, MqttQoS.AT_LEAST_ONCE).get();
 | 
				
			||||||
 | 
					        mqttClient.publish("v1/gateway/attributes/request", Unpooled.wrappedBuffer(requestData.toString().getBytes())).get();
 | 
				
			||||||
 | 
					        event = listener.getEvents().poll(10, TimeUnit.SECONDS);
 | 
				
			||||||
 | 
					        responseData = jsonParser.parse(Objects.requireNonNull(event).getMessage()).getAsJsonObject();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Assert.assertTrue(responseData.has("values"));
 | 
				
			||||||
 | 
					        Assert.assertEquals(sharedAttributes.get("attr1").getAsString(), responseData.get("values").getAsJsonObject().get("attr1").getAsString());
 | 
				
			||||||
 | 
					        Assert.assertEquals(1, responseData.get("values").getAsJsonObject().entrySet().size());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Test
 | 
					    @Test
 | 
				
			||||||
    public void requestAttributeValuesFromServer() throws Exception {
 | 
					    public void requestAttributeValuesFromServer() throws Exception {
 | 
				
			||||||
        WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
 | 
					        WsClient wsClient = subscribeToWebSocket(createdDevice.getId(), "CLIENT_SCOPE", CmdsType.ATTR_SUB_CMDS);
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user