diff --git a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java index a2dd53bd82..d2d9b7ae10 100644 --- a/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/plugin/PluginProcessingContext.java @@ -62,6 +62,7 @@ public final class PluginProcessingContext implements PluginContext { private static final Executor executor = Executors.newSingleThreadExecutor(); public static final String CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "Customer user is not allowed to perform this operation!"; public static final String SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION = "System administrator is not allowed to perform this operation!"; + public static final String DEVICE_WITH_REQUESTED_ID_NOT_FOUND = "Device with requested id wasn't found!"; private final SharedPluginProcessingContext pluginCtx; private final Optional securityCtx; @@ -309,7 +310,7 @@ public final class PluginProcessingContext implements PluginContext { ListenableFuture deviceFuture = pluginCtx.deviceService.findDeviceByIdAsync(new DeviceId(entityId.getId())); Futures.addCallback(deviceFuture, getCallback(callback, device -> { if (device == null) { - return ValidationResult.entityNotFound("Device with requested id wasn't found!"); + return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND); } else { if (!device.getTenantId().equals(ctx.getTenantId())) { return ValidationResult.accessDenied("Device doesn't belong to the current Tenant!"); diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java index 19e4329c33..8d68bf81b8 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractControllerTest.java @@ -106,6 +106,11 @@ public abstract class AbstractControllerTest { protected static final String CUSTOMER_USER_EMAIL = "testcustomer@thingsboard.org"; private static final String CUSTOMER_USER_PASSWORD = "customer"; + /** See {@link org.springframework.test.web.servlet.DefaultMvcResult#getAsyncResult(long)} + * and {@link org.springframework.mock.web.MockAsyncContext#getTimeout()} + */ + private static final long DEFAULT_TIMEOUT = -1L; + protected MediaType contentType = new MediaType(MediaType.APPLICATION_JSON.getType(), MediaType.APPLICATION_JSON.getSubtype(), Charset.forName("utf8")); @@ -336,7 +341,7 @@ public abstract class AbstractControllerTest { } protected T doPost(String urlTemplate, T content, Class responseClass, ResultMatcher resultMatcher, String... params) throws Exception { - return readResponse(doPost(urlTemplate, params).andExpect(resultMatcher), responseClass); + return readResponse(doPost(urlTemplate, content, params).andExpect(resultMatcher), responseClass); } protected T doPost(String urlTemplate, T content, Class responseClass, String... params) throws Exception { @@ -344,7 +349,11 @@ public abstract class AbstractControllerTest { } protected T doPostAsync(String urlTemplate, T content, Class responseClass, ResultMatcher resultMatcher, String... params) throws Exception { - return readResponse(doPostAsync(urlTemplate, content, params).andExpect(resultMatcher), responseClass); + return readResponse(doPostAsync(urlTemplate, content, DEFAULT_TIMEOUT, params).andExpect(resultMatcher), responseClass); + } + + protected T doPostAsync(String urlTemplate, T content, Class responseClass, ResultMatcher resultMatcher, Long timeout, String... params) throws Exception { + return readResponse(doPostAsync(urlTemplate, content, timeout, params).andExpect(resultMatcher), responseClass); } protected T doDelete(String urlTemplate, Class responseClass, String... params) throws Exception { @@ -366,12 +375,13 @@ public abstract class AbstractControllerTest { return mockMvc.perform(postRequest); } - protected ResultActions doPostAsync(String urlTemplate, T content, String... params) throws Exception { + protected ResultActions doPostAsync(String urlTemplate, T content, Long timeout, String... params) throws Exception { MockHttpServletRequestBuilder postRequest = post(urlTemplate); setJwtToken(postRequest); String json = json(content); postRequest.contentType(contentType).content(json); MvcResult result = mockMvc.perform(postRequest).andReturn(); + result.getAsyncResult(timeout); return mockMvc.perform(asyncDispatch(result)); } @@ -384,8 +394,8 @@ public abstract class AbstractControllerTest { protected void populateParams(MockHttpServletRequestBuilder request, String... params) { if (params != null && params.length > 0) { - Assert.assertEquals(params.length % 2, 0); - MultiValueMap paramsMap = new LinkedMultiValueMap(); + Assert.assertEquals(0, params.length % 2); + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); for (int i = 0; i < params.length; i += 2) { paramsMap.add(params[i], params[i + 1]); } diff --git a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java index ac474b818e..8b4332c9f0 100644 --- a/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/mqtt/rpc/AbstractMqttServerSideRpcIntegrationTest.java @@ -15,21 +15,23 @@ */ package org.thingsboard.server.mqtt.rpc; +import java.util.Arrays; + +import com.datastax.driver.core.utils.UUIDs; +import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.*; import org.junit.*; -import org.springframework.http.HttpStatus; -import org.springframework.web.client.HttpClientErrorException; +import org.thingsboard.server.actors.plugin.PluginProcessingContext; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.page.TextPageData; +import org.thingsboard.server.common.data.plugin.PluginMetaData; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.controller.AbstractControllerTest; -import org.thingsboard.server.dao.service.DaoNoSqlTest; - -import java.util.UUID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -42,15 +44,19 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractControllerTest { private static final String MQTT_URL = "tcp://localhost:1883"; - private static final String FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED = "HttpClientErrorException expected, but not encountered"; + private static final Long TIME_TO_HANDLE_REQUEST = 500L; private Tenant savedTenant; private User tenantAdmin; + private Long asyncContextTimeoutToUseRpcPlugin; + @Before public void beforeTest() throws Exception { loginSysAdmin(); + asyncContextTimeoutToUseRpcPlugin = getAsyncContextTimeoutToUseRpcPlugin(); + Tenant tenant = new Tenant(); tenant.setTitle("My tenant"); savedTenant = doPost("/api/tenant", tenant, Tenant.class); @@ -70,8 +76,7 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC public void afterTest() throws Exception { loginSysAdmin(); if (savedTenant != null) { - doDelete("/api/tenant/" + savedTenant.getId().getId().toString()) - .andExpect(status().isOk()); + doDelete("/api/tenant/" + savedTenant.getId().getId().toString()).andExpect(status().isOk()); } } @@ -102,7 +107,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC } @Test - @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200 public void testServerMqttOneWayRpcDeviceOffline() throws Exception { Device device = new Device(); device.setName("Test One-Way Server-Side RPC Device Offline"); @@ -115,29 +119,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - try { - doPost("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().is(408)); - Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); - } catch (HttpClientErrorException e) { - log.error(e.getMessage(), e); - Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode()); - Assert.assertEquals("408 null", e.getMessage()); - } + + doPostAsync("/api/plugins/rpc/oneway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + asyncContextTimeoutToUseRpcPlugin); } @Test - @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401 public void testServerMqttOneWayRpcDeviceDoesNotExist() throws Exception { String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; - String nonExistentDeviceId = UUID.randomUUID().toString(); - try { - doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400)); - Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); - } catch (HttpClientErrorException e) { - log.error(e.getMessage(), e); - Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode()); - Assert.assertEquals("400 null", e.getMessage()); - } + String nonExistentDeviceId = UUIDs.timeBased().toString(); + + String result = doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, + status().isNotFound()); + Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result); } @Test @@ -168,7 +162,6 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC } @Test - @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 408 but was: 200 public void testServerMqttTwoWayRpcDeviceOffline() throws Exception { Device device = new Device(); device.setName("Test Two-Way Server-Side RPC Device Offline"); @@ -181,29 +174,19 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; String deviceId = savedDevice.getId().getId().toString(); - try { - doPost("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().is(408)); - Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); - } catch (HttpClientErrorException e) { - log.error(e.getMessage(), e); - Assert.assertEquals(HttpStatus.REQUEST_TIMEOUT, e.getStatusCode()); - Assert.assertEquals("408 null", e.getMessage()); - } + + doPostAsync("/api/plugins/rpc/twoway/" + deviceId, setGpioRequest, String.class, status().isRequestTimeout(), + asyncContextTimeoutToUseRpcPlugin); } @Test - @Ignore // TODO: figure out the right error code for this case. Ignored due to failure: expected 400 (404?) but was: 401 public void testServerMqttTwoWayRpcDeviceDoesNotExist() throws Exception { String setGpioRequest = "{\"method\":\"setGpio\",\"params\":{\"pin\": \"23\",\"value\": 1}}"; - String nonExistentDeviceId = UUID.randomUUID().toString(); - try { - doPostAsync("/api/plugins/rpc/oneway/" + nonExistentDeviceId, setGpioRequest, String.class, status().is(400)); - Assert.fail(FAIL_MSG_IF_HTTP_CLIENT_ERROR_NOT_ENCOUNTERED); - } catch (HttpClientErrorException e) { - log.error(e.getMessage(), e); - Assert.assertEquals(HttpStatus.BAD_REQUEST, e.getStatusCode()); - Assert.assertEquals("400 null", e.getMessage()); - } + String nonExistentDeviceId = UUIDs.timeBased().toString(); + + String result = doPostAsync("/api/plugins/rpc/twoway/" + nonExistentDeviceId, setGpioRequest, String.class, + status().isNotFound()); + Assert.assertEquals(PluginProcessingContext.DEVICE_WITH_REQUESTED_ID_NOT_FOUND, result); } private Device getSavedDevice(Device device) throws Exception { @@ -214,6 +197,13 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC return doGet("/api/device/" + savedDevice.getId().getId().toString() + "/credentials", DeviceCredentials.class); } + private Long getAsyncContextTimeoutToUseRpcPlugin() throws Exception { + TextPageData plugins = doGetTyped("/api/plugin/system?limit=1&textSearch=system rpc plugin", + new TypeReference>(){}); + Long systemRpcPluginTimeout = plugins.getData().iterator().next().getConfiguration().get("defaultTimeout").asLong(); + return systemRpcPluginTimeout + TIME_TO_HANDLE_REQUEST; + } + private static class TestMqttCallback implements MqttCallback { private final MqttAsyncClient client; @@ -228,10 +218,10 @@ public abstract class AbstractMqttServerSideRpcIntegrationTest extends AbstractC @Override public void messageArrived(String requestTopic, MqttMessage mqttMessage) throws Exception { - log.info("Message Arrived: " + mqttMessage.getPayload().toString()); + log.info("Message Arrived: " + Arrays.toString(mqttMessage.getPayload())); MqttMessage message = new MqttMessage(); String responseTopic = requestTopic.replace("request", "response"); - message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes()); + message.setPayload("{\"value1\":\"A\", \"value2\":\"B\"}".getBytes("UTF-8")); client.publish(responseTopic, message); }