Merge branch 'develop/3.5.2' into feature/widget-config

This commit is contained in:
Igor Kulikov 2023-05-30 12:38:22 +03:00
commit d30ed1243c
17 changed files with 325 additions and 157 deletions

View File

@ -194,17 +194,25 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
processMqttMsg(ctx, message); processMqttMsg(ctx, message);
} else { } else {
log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage()); log.error("[{}] Message decoding failed: {}", sessionId, message.decoderResult().cause().getMessage());
ctx.close(); closeCtx(ctx);
} }
} else { } else {
log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName()); log.debug("[{}] Received non mqtt message: {}", sessionId, msg.getClass().getSimpleName());
ctx.close(); closeCtx(ctx);
} }
} finally { } finally {
ReferenceCountUtil.safeRelease(msg); ReferenceCountUtil.safeRelease(msg);
} }
} }
private void closeCtx(ChannelHandlerContext ctx) {
if (!rpcAwaitingAck.isEmpty()) {
log.debug("[{}] Cleanup rpc awaiting ack map due to session close!", sessionId);
rpcAwaitingAck.clear();
}
ctx.close();
}
InetSocketAddress getAddress(ChannelHandlerContext ctx) { InetSocketAddress getAddress(ChannelHandlerContext ctx) {
var address = ctx.channel().attr(MqttTransportService.ADDRESS).get(); var address = ctx.channel().attr(MqttTransportService.ADDRESS).get();
if (address == null) { if (address == null) {
@ -221,7 +229,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) { void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
if (msg.fixedHeader() == null) { if (msg.fixedHeader() == null) {
log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort()); log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
ctx.close(); closeCtx(ctx);
return; return;
} }
deviceSessionCtx.setChannel(ctx); deviceSessionCtx.setChannel(ctx);
@ -258,21 +266,21 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} else { } else {
log.debug("[{}] Unsupported topic for provisioning requests: {}!", sessionId, topicName); log.debug("[{}] Unsupported topic for provisioning requests: {}!", sessionId, topicName);
ctx.close(); closeCtx(ctx);
} }
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ctx.close(); closeCtx(ctx);
} catch (AdaptorException e) { } catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ctx.close(); closeCtx(ctx);
} }
break; break;
case PINGREQ: case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
break; break;
case DISCONNECT: case DISCONNECT:
ctx.close(); closeCtx(ctx);
break; break;
} }
} }
@ -282,7 +290,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) { if (queueSize >= context.getMessageQueueSizePerDeviceLimit()) {
log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}", log.info("Closing current session because msq queue size for device {} exceed limit {} with msgQueueSize counter {} and actual queue size {}",
deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize()); deviceSessionCtx.getDeviceId(), context.getMessageQueueSizePerDeviceLimit(), queueSize, deviceSessionCtx.getMsgQueueSize());
ctx.close(); closeCtx(ctx);
return; return;
} }
@ -316,7 +324,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
break; break;
case DISCONNECT: case DISCONNECT:
ctx.close(); closeCtx(ctx);
break; break;
case PUBACK: case PUBACK:
int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId(); int msgId = ((MqttPubAckMessage) msg).variableHeader().messageId();
@ -381,7 +389,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.warn("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ctx.close(); closeCtx(ctx);
} catch (AdaptorException e) { } catch (AdaptorException e) {
log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.debug("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId); sendAckOrCloseSession(ctx, topicName, msgId);
@ -421,7 +429,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} catch (RuntimeException e) { } catch (RuntimeException e) {
log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ctx.close(); closeCtx(ctx);
} catch (AdaptorException | ThingsboardException | InvalidProtocolBufferException e) { } catch (AdaptorException | ThingsboardException | InvalidProtocolBufferException e) {
log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e); log.error("[{}] Failed to process publish msg [{}][{}]", sessionId, topicName, msgId, e);
sendAckOrCloseSession(ctx, topicName, msgId); sendAckOrCloseSession(ctx, topicName, msgId);
@ -523,7 +531,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.PAYLOAD_FORMAT_INVALID)); ctx.writeAndFlush(createMqttPubAckMsg(deviceSessionCtx, msgId, ReturnCode.PAYLOAD_FORMAT_INVALID));
} else { } else {
log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId); log.info("[{}] Closing current session due to invalid publish msg [{}][{}]", sessionId, topicName, msgId);
ctx.close(); closeCtx(ctx);
} }
} }
@ -579,7 +587,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e); log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
ctx.close(); closeCtx(ctx);
} }
}; };
} }
@ -615,7 +623,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e); log.trace("[{}] Failed to publish msg: {}", sessionId, msg, e);
ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC); ack(ctx, msgId, ReturnCode.IMPLEMENTATION_SPECIFIC);
ctx.close(); closeCtx(ctx);
} }
} }
@ -650,7 +658,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
@Override @Override
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to get firmware: {}", sessionId, msg, e); log.trace("[{}] Failed to get firmware: {}", sessionId, msg, e);
ctx.close(); closeCtx(ctx);
} }
} }
@ -672,7 +680,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
deviceSessionCtx.getChannel().writeAndFlush(deviceSessionCtx deviceSessionCtx.getChannel().writeAndFlush(deviceSessionCtx
.getPayloadAdaptor() .getPayloadAdaptor()
.createMqttPublishMsg(deviceSessionCtx, MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC, error.getBytes())); .createMqttPublishMsg(deviceSessionCtx, MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC, error.getBytes()));
ctx.close(); closeCtx(ctx);
} }
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
@ -922,7 +930,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e); log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close(); closeCtx(ctx);
} }
}); });
} }
@ -945,14 +953,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onError(Throwable e) { public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e); log.trace("[{}] Failed to process credentials: {}", address, sha3Hash, e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close(); closeCtx(ctx);
} }
}); });
} catch (Exception e) { } catch (Exception e) {
context.onAuthFailure(address); context.onAuthFailure(address);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.NOT_AUTHORIZED_5, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.NOT_AUTHORIZED_5, connectMessage));
log.trace("[{}] X509 auth failure: {}", sessionId, address, e); log.trace("[{}] X509 auth failure: {}", sessionId, address, e);
ctx.close(); closeCtx(ctx);
} }
} }
@ -1000,7 +1008,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.error("[{}] Unexpected Exception", sessionId, cause); log.error("[{}] Unexpected Exception", sessionId, cause);
} }
ctx.close(); closeCtx(ctx);
if (cause instanceof OutOfMemoryError) { if (cause instanceof OutOfMemoryError) {
log.error("Received critical error. Going to shutdown the service."); log.error("Received critical error. Going to shutdown the service.");
System.exit(1); System.exit(1);
@ -1082,7 +1090,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} catch (Exception e) { } catch (Exception e) {
log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e); log.trace("[{}][{}] Failed to fetch sparkplugDevice connect, sparkplugTopicName", sessionId, deviceSessionCtx.getDeviceInfo().getDeviceName(), e);
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close(); closeCtx(ctx);
} }
} }
@ -1139,7 +1147,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
} }
} }
ctx.writeAndFlush(createMqttConnAckMsg(returnCode, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(returnCode, connectMessage));
ctx.close(); closeCtx(ctx);
} else { } else {
context.onAuthSuccess(address); context.onAuthSuccess(address);
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
@ -1168,7 +1176,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
log.warn("[{}] Failed to submit session event", sessionId, e); log.warn("[{}] Failed to submit session event", sessionId, e);
} }
ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage)); ctx.writeAndFlush(createMqttConnAckMsg(ReturnCode.SERVER_UNAVAILABLE_5, connectMessage));
ctx.close(); closeCtx(ctx);
} }
}); });
} }
@ -1216,7 +1224,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) {
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage());
transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); transportService.deregisterSession(deviceSessionCtx.getSessionInfo());
deviceSessionCtx.getChannel().close(); closeCtx(deviceSessionCtx.getChannel());
} }
@Override @Override
@ -1327,7 +1335,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
public void onDeviceDeleted(DeviceId deviceId) { public void onDeviceDeleted(DeviceId deviceId) {
context.onAuthFailure(address); context.onAuthFailure(address);
ChannelHandlerContext ctx = deviceSessionCtx.getChannel(); ChannelHandlerContext ctx = deviceSessionCtx.getChannel();
ctx.close(); closeCtx(ctx);
} }
public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) { public void sendErrorRpcResponse(TransportProtos.SessionInfoProto sessionInfo, int requestId, ThingsboardErrorCode result, String errorMsg) {

View File

@ -36,7 +36,7 @@ public class TimescaleSqlInitializer {
"sql/schema-views-and-functions.sql", "sql/schema-views-and-functions.sql",
"sql/system-data.sql", "sql/system-data.sql",
"sql/system-test-psql.sql"); "sql/system-test-psql.sql");
private static final String dropAllTablesSqlFile = "sql/timescale/drop-all-tables.sql"; private static final String dropAllTablesSqlFile = "sql/psql/drop-all-tables.sql";
public static void initDb(Connection conn) { public static void initDb(Connection conn) {
cleanUpDb(conn); cleanUpDb(conn);

View File

@ -1,42 +0,0 @@
DROP TABLE IF EXISTS admin_settings;
DROP TABLE IF EXISTS entity_alarm;
DROP TABLE IF EXISTS alarm;
DROP TABLE IF EXISTS asset;
DROP TABLE IF EXISTS audit_log;
DROP TABLE IF EXISTS attribute_kv;
DROP TABLE IF EXISTS component_descriptor;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS device;
DROP TABLE IF EXISTS device_credentials;
DROP TABLE IF EXISTS event;
DROP TABLE IF EXISTS relation;
DROP TABLE IF EXISTS tb_user;
DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_dictionary;
DROP TABLE IF EXISTS ts_kv_latest;
DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widget_type;
DROP TABLE IF EXISTS widgets_bundle;
DROP TABLE IF EXISTS entity_view;
DROP TABLE IF EXISTS device_profile;
DROP TABLE IF EXISTS tenant_profile;
DROP TABLE IF EXISTS dashboard;
DROP TABLE IF EXISTS rule_node_state;
DROP TABLE IF EXISTS rule_node;
DROP TABLE IF EXISTS rule_chain;
DROP TABLE IF EXISTS oauth2_mobile;
DROP TABLE IF EXISTS oauth2_domain;
DROP TABLE IF EXISTS oauth2_registration;
DROP TABLE IF EXISTS oauth2_params;
DROP TABLE IF EXISTS oauth2_client_registration_template;
DROP TABLE IF EXISTS oauth2_client_registration;
DROP TABLE IF EXISTS oauth2_client_registration_info;
DROP TABLE IF EXISTS api_usage_state;
DROP TABLE IF EXISTS resource;
DROP TABLE IF EXISTS ota_package;
DROP TABLE IF EXISTS edge;
DROP TABLE IF EXISTS edge_event;
DROP TABLE IF EXISTS rpc;
DROP TABLE IF EXISTS queue;
DROP FUNCTION IF EXISTS to_uuid;

View File

@ -1,5 +1,23 @@
DROP FUNCTION IF EXISTS to_uuid;
DROP FUNCTION IF EXISTS create_or_update_active_alarm;
DROP FUNCTION IF EXISTS update_alarm;
DROP FUNCTION IF EXISTS acknowledge_alarm;
DROP FUNCTION IF EXISTS clear_alarm;
DROP FUNCTION IF EXISTS assign_alarm;
DROP FUNCTION IF EXISTS unassign_alarm;
DROP PROCEDURE IF EXISTS cleanup_edge_events_by_ttl;
DROP PROCEDURE IF EXISTS cleanup_timeseries_by_ttl;
DROP FUNCTION IF EXISTS delete_customer_records_from_ts_kv;
DROP VIEW IF EXISTS device_info_active_attribute_view CASCADE;
DROP VIEW IF EXISTS device_info_active_ts_view CASCADE;
DROP VIEW IF EXISTS device_info_view CASCADE;
DROP VIEW IF EXISTS alarm_info CASCADE;
DROP TABLE IF EXISTS admin_settings; DROP TABLE IF EXISTS admin_settings;
DROP TABLE IF EXISTS entity_alarm; DROP TABLE IF EXISTS entity_alarm;
DROP TABLE IF EXISTS alarm_comment;
DROP TABLE IF EXISTS alarm; DROP TABLE IF EXISTS alarm;
DROP TABLE IF EXISTS asset; DROP TABLE IF EXISTS asset;
DROP TABLE IF EXISTS audit_log; DROP TABLE IF EXISTS audit_log;
@ -8,9 +26,12 @@ DROP TABLE IF EXISTS component_descriptor;
DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS device; DROP TABLE IF EXISTS device;
DROP TABLE IF EXISTS device_credentials; DROP TABLE IF EXISTS device_credentials;
DROP TABLE IF EXISTS event; DROP TABLE IF EXISTS rule_node_debug_event;
DROP TABLE IF EXISTS rule_chain_debug_event;
DROP TABLE IF EXISTS stats_event;
DROP TABLE IF EXISTS lc_event;
DROP TABLE IF EXISTS error_event;
DROP TABLE IF EXISTS relation; DROP TABLE IF EXISTS relation;
DROP TABLE IF EXISTS tb_user;
DROP TABLE IF EXISTS tenant; DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS ts_kv; DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_latest; DROP TABLE IF EXISTS ts_kv_latest;
@ -31,9 +52,10 @@ DROP TABLE IF EXISTS oauth2_mobile;
DROP TABLE IF EXISTS oauth2_domain; DROP TABLE IF EXISTS oauth2_domain;
DROP TABLE IF EXISTS oauth2_registration; DROP TABLE IF EXISTS oauth2_registration;
DROP TABLE IF EXISTS oauth2_params; DROP TABLE IF EXISTS oauth2_params;
DROP TABLE IF EXISTS oauth2_client_registration_template;
DROP TABLE IF EXISTS oauth2_client_registration; DROP TABLE IF EXISTS oauth2_client_registration;
DROP TABLE IF EXISTS oauth2_client_registration_info; DROP TABLE IF EXISTS oauth2_client_registration_info;
DROP TABLE IF EXISTS oauth2_client_registration_template;
DROP TABLE IF EXISTS ota_package;
DROP TABLE IF EXISTS api_usage_state; DROP TABLE IF EXISTS api_usage_state;
DROP TABLE IF EXISTS resource; DROP TABLE IF EXISTS resource;
DROP TABLE IF EXISTS firmware; DROP TABLE IF EXISTS firmware;
@ -41,3 +63,11 @@ DROP TABLE IF EXISTS edge;
DROP TABLE IF EXISTS edge_event; DROP TABLE IF EXISTS edge_event;
DROP TABLE IF EXISTS rpc; DROP TABLE IF EXISTS rpc;
DROP TABLE IF EXISTS queue; DROP TABLE IF EXISTS queue;
DROP TABLE IF EXISTS notification;
DROP TABLE IF EXISTS notification_request;
DROP TABLE IF EXISTS notification_rule;
DROP TABLE IF EXISTS notification_template;
DROP TABLE IF EXISTS notification_target;
DROP TABLE IF EXISTS user_settings;
DROP TABLE IF EXISTS user_auth_settings;
DROP TABLE IF EXISTS tb_user;

View File

@ -1,2 +1,2 @@
--PostgreSQL specific truncate to fit constraints --PostgreSQL specific truncate to fit constraints
TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain; TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;

View File

@ -1,38 +0,0 @@
DROP TABLE IF EXISTS admin_settings;
DROP TABLE IF EXISTS entity_alarm;
DROP TABLE IF EXISTS alarm;
DROP TABLE IF EXISTS asset;
DROP TABLE IF EXISTS audit_log;
DROP TABLE IF EXISTS attribute_kv;
DROP TABLE IF EXISTS component_descriptor;
DROP TABLE IF EXISTS customer;
DROP TABLE IF EXISTS device;
DROP TABLE IF EXISTS device_credentials;
DROP TABLE IF EXISTS event;
DROP TABLE IF EXISTS relation;
DROP TABLE IF EXISTS tb_user;
DROP TABLE IF EXISTS tenant;
DROP TABLE IF EXISTS ts_kv;
DROP TABLE IF EXISTS ts_kv_latest;
DROP TABLE IF EXISTS ts_kv_dictionary;
DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widget_type;
DROP TABLE IF EXISTS widgets_bundle;
DROP TABLE IF EXISTS rule_node_state;
DROP TABLE IF EXISTS rule_node;
DROP TABLE IF EXISTS rule_chain;
DROP TABLE IF EXISTS entity_view;
DROP TABLE IF EXISTS device_profile;
DROP TABLE IF EXISTS tenant_profile;
DROP TABLE IF EXISTS asset_profile;
DROP TABLE IF EXISTS dashboard;
DROP TABLE IF EXISTS edge;
DROP TABLE IF EXISTS edge_event;
DROP TABLE IF EXISTS tb_schema_settings;
DROP TABLE IF EXISTS oauth2_client_registration;
DROP TABLE IF EXISTS oauth2_client_registration_info;
DROP TABLE IF EXISTS oauth2_client_registration_template;
DROP TABLE IF EXISTS api_usage_state;
DROP TABLE IF EXISTS resource;
DROP TABLE IF EXISTS firmware;
DROP TABLE IF EXISTS queue;

View File

@ -55,6 +55,7 @@ import java.io.ByteArrayInputStream;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -157,6 +158,13 @@ abstract public class AbstractDriverBaseTest extends AbstractContainerTest {
.findFirst().orElse(null); .findFirst().orElse(null);
} }
public List<Device> getDevicesByName(List<String> deviceNames) {
List<Device> allDevices = testRestClient.getDevices(pageLink).getData();
return allDevices.stream()
.filter(device -> deviceNames.contains(device.getName()))
.collect(Collectors.toList());
}
public List<RuleChain> getRuleChainsByName(String name) { public List<RuleChain> getRuleChainsByName(String name) {
return testRestClient.getRuleChains(pageLink).getData().stream() return testRestClient.getRuleChains(pageLink).getData().stream()
.filter(s -> s.getName().equals(name)) .filter(s -> s.getName().equals(name))
@ -174,13 +182,10 @@ abstract public class AbstractDriverBaseTest extends AbstractContainerTest {
} }
public DeviceProfile getDeviceProfileByName(String name) { public DeviceProfile getDeviceProfileByName(String name) {
try {
return testRestClient.getDeviceProfiles(pageLink).getData().stream() return testRestClient.getDeviceProfiles(pageLink).getData().stream()
.filter(x -> x.getName().equals(name)).collect(Collectors.toList()).get(0); .filter(x -> x.getName().equals(name))
} catch (Exception e) { .findFirst()
log.error("No such device profile with name: " + name); .orElse(null);
return null;
}
} }
public AssetProfile getAssetProfileByName(String name) { public AssetProfile getAssetProfileByName(String name) {
@ -291,6 +296,15 @@ abstract public class AbstractDriverBaseTest extends AbstractContainerTest {
} }
} }
public void deleteDevicesByName(List<String> deviceNames) {
List<Device> devices = getDevicesByName(deviceNames);
for (Device device : devices) {
if (device != null) {
testRestClient.deleteDevice(device.getId());
}
}
}
public void deleteDeviceProfileByTitle(String deviceProfileTitle) { public void deleteDeviceProfileByTitle(String deviceProfileTitle) {
DeviceProfile deviceProfile = getDeviceProfileByName(deviceProfileTitle); DeviceProfile deviceProfile = getDeviceProfileByName(deviceProfileTitle);
if (deviceProfile != null) { if (deviceProfile != null) {

View File

@ -18,6 +18,8 @@ package org.thingsboard.server.msa.ui.pages;
import org.openqa.selenium.WebDriver; import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement; import org.openqa.selenium.WebElement;
import java.util.List;
public class DevicePageElements extends OtherPageElementsHelper { public class DevicePageElements extends OtherPageElementsHelper {
public DevicePageElements(WebDriver driver) { public DevicePageElements(WebDriver driver) {
super(driver); super(driver);
@ -31,7 +33,7 @@ public class DevicePageElements extends OtherPageElementsHelper {
private static final String CHOOSE_CUSTOMER_FOR_ASSIGN_FIELD = "//input[@formcontrolname='entity']"; private static final String CHOOSE_CUSTOMER_FOR_ASSIGN_FIELD = "//input[@formcontrolname='entity']";
private static final String ENTITY_FROM_DROPDOWN = "//div[@role = 'listbox']//span[text() = '%s']"; private static final String ENTITY_FROM_DROPDOWN = "//div[@role = 'listbox']//span[text() = '%s']";
private static final String CLOSE_DEVICE_DETAILS_VIEW = "//header//mat-icon[contains(text(),'close')]/parent::button"; private static final String CLOSE_DEVICE_DETAILS_VIEW = "//header//mat-icon[contains(text(),'close')]/parent::button";
private static final String SUBMIT_ASSIGN_TO_CUSTOMER_BTN = "//button[@type='submit']"; private static final String SUBMIT_BTN = "//button[@type='submit']";
private static final String ADD_DEVICE_BTN = "//mat-icon[text() = 'insert_drive_file']/parent::button"; private static final String ADD_DEVICE_BTN = "//mat-icon[text() = 'insert_drive_file']/parent::button";
private static final String HEADER_NAME_VIEW = "//header//div[@class='tb-details-title']/span"; private static final String HEADER_NAME_VIEW = "//header//div[@class='tb-details-title']/span";
private static final String ADD_DEVICE_VIEW = "//tb-device-wizard"; private static final String ADD_DEVICE_VIEW = "//tb-device-wizard";
@ -47,12 +49,17 @@ public class DevicePageElements extends OtherPageElementsHelper {
private static final String DEVICE_CUSTOMER_PAGE = DEVICE + "/ancestor::mat-row//mat-cell[contains(@class,'cdk-column-customerTitle')]/span"; private static final String DEVICE_CUSTOMER_PAGE = DEVICE + "/ancestor::mat-row//mat-cell[contains(@class,'cdk-column-customerTitle')]/span";
private static final String DEVICE_LABEL_EDIT = "//input[@formcontrolname='label']"; private static final String DEVICE_LABEL_EDIT = "//input[@formcontrolname='label']";
private static final String DEVICE_DEVICE_PROFILE_PAGE = DEVICE + "/ancestor::mat-row//mat-cell[contains(@class,'cdk-column-deviceProfileName')]/span"; private static final String DEVICE_DEVICE_PROFILE_PAGE = DEVICE + "/ancestor::mat-row//mat-cell[contains(@class,'cdk-column-deviceProfileName')]/span";
protected static final String ASSIGN_BTN = ENTITY + "/ancestor::mat-row//mat-icon[contains(text(),'assignment_ind')]/ancestor::button"; private static final String ASSIGN_BTN = ENTITY + "/ancestor::mat-row//mat-icon[contains(text(),'assignment_ind')]/ancestor::button";
protected static final String UNASSIGN_BTN = ENTITY + "/ancestor::mat-row//mat-icon[contains(text(),' assignment_return')]/ancestor::button"; private static final String UNASSIGN_BTN = ENTITY + "/ancestor::mat-row//mat-icon[contains(text(),' assignment_return')]/ancestor::button";
protected static final String ASSIGN_BTN_DETAILS_TAB = "//span[contains(text(),'Assign to customer')]/parent::button"; private static final String ASSIGN_BTN_DETAILS_TAB = "//span[contains(text(),'Assign to customer')]/parent::button";
protected static final String UNASSIGN_BTN_DETAILS_TAB = "//span[contains(text(),'Unassign from customer')]/parent::button"; private static final String UNASSIGN_BTN_DETAILS_TAB = "//span[contains(text(),'Unassign from customer')]/parent::button";
protected static final String ASSIGNED_FIELD_DETAILS_TAB = "//mat-label[text() = 'Assigned to customer']/parent::label/parent::div/input"; private static final String ASSIGNED_FIELD_DETAILS_TAB = "//mat-label[text() = 'Assigned to customer']/parent::label/parent::div/input";
protected static final String ASSIGN_MARKED_DEVICE_BTN = "//mat-icon[text() = 'assignment_ind']/parent::button"; private static final String ASSIGN_MARKED_DEVICE_BTN = "//mat-icon[text() = 'assignment_ind']/parent::button";
private static final String FILTER_BTN = "//tb-device-info-filter/button";
private static final String DEVICE_PROFILE_FIELD = "(//input[@formcontrolname='deviceProfile'])[2]";
private static final String DEVICE_STATE_SELECT = "//div[contains(@class,'tb-filter-panel')]//mat-select[@role='combobox']";
private static final String LIST_OF_DEVICES_STATE = "//div[@class='status']";
private static final String LIST_OF_DEVICES_PROFILE = "//mat-cell[contains(@class,'deviceProfileName')]";
public WebElement device(String deviceName) { public WebElement device(String deviceName) {
return waitUntilElementToBeClickable(String.format(DEVICE, deviceName)); return waitUntilElementToBeClickable(String.format(DEVICE, deviceName));
@ -82,8 +89,8 @@ public class DevicePageElements extends OtherPageElementsHelper {
return waitUntilElementToBeClickable(CLOSE_DEVICE_DETAILS_VIEW); return waitUntilElementToBeClickable(CLOSE_DEVICE_DETAILS_VIEW);
} }
public WebElement submitAssignToCustomerBtn() { public WebElement submitBtn() {
return waitUntilElementToBeClickable(SUBMIT_ASSIGN_TO_CUSTOMER_BTN); return waitUntilElementToBeClickable(SUBMIT_BTN);
} }
public WebElement addDeviceBtn() { public WebElement addDeviceBtn() {
@ -177,4 +184,24 @@ public class DevicePageElements extends OtherPageElementsHelper {
public WebElement assignMarkedDeviceBtn() { public WebElement assignMarkedDeviceBtn() {
return waitUntilVisibilityOfElementLocated(ASSIGN_MARKED_DEVICE_BTN); return waitUntilVisibilityOfElementLocated(ASSIGN_MARKED_DEVICE_BTN);
} }
public WebElement filterBtn() {
return waitUntilElementToBeClickable(FILTER_BTN);
}
public WebElement deviceProfileField() {
return waitUntilElementToBeClickable(DEVICE_PROFILE_FIELD);
}
public WebElement deviceStateSelect() {
return waitUntilElementToBeClickable(DEVICE_STATE_SELECT);
}
public List<WebElement> listOfDevicesState() {
return waitUntilVisibilityOfElementsLocated(LIST_OF_DEVICES_STATE);
}
public List<WebElement> listOfDevicesProfile() {
return waitUntilVisibilityOfElementsLocated(LIST_OF_DEVICES_PROFILE);
}
} }

View File

@ -35,7 +35,7 @@ public class DevicePageHelper extends DevicePageElements {
public void assignToCustomer(String customerTitle) { public void assignToCustomer(String customerTitle) {
chooseCustomerForAssignField().click(); chooseCustomerForAssignField().click();
entityFromDropdown(customerTitle).click(); entityFromDropdown(customerTitle).click();
submitAssignToCustomerBtn().click(); submitBtn().click();
} }
public void openCreateDeviceView() { public void openCreateDeviceView() {
@ -101,4 +101,26 @@ public class DevicePageHelper extends DevicePageElements {
deleteSelectedBtn().click(); deleteSelectedBtn().click();
warningPopUpYesBtn().click(); warningPopUpYesBtn().click();
} }
public void filterDeviceByDeviceProfile(String deviceProfileTitle) {
clearProfileFieldBtn().click();
entityFromDropdown(deviceProfileTitle).click();
submitBtn().click();
}
public void filterDeviceByState(String state) {
deviceStateSelect().click();
entityFromDropdown(" " + state + " ").click();
sleep(2); //wait until the action is counted
submitBtn().click();
}
public void filterDeviceByDeviceProfileAndState(String deviceProfileTitle, String state) {
clearProfileFieldBtn().click();
entityFromDropdown(deviceProfileTitle).click();
deviceStateSelect().click();
entityFromDropdown(" " + state + " ").click();
sleep(2); //wait until the action is counted
submitBtn().click();
}
} }

View File

@ -45,9 +45,5 @@ abstract public class AbstractDeviceTest extends AbstractDriverBaseTest {
public void delete() { public void delete() {
deleteDeviceByName(deviceName); deleteDeviceByName(deviceName);
deviceName = null; deviceName = null;
if (deviceProfileTitle != null) {
deleteDeviceProfileByTitle(deviceProfileTitle);
deviceProfileTitle = null;
}
} }
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.msa.ui.tests.devicessmoke;
import io.qameta.allure.Description; import io.qameta.allure.Description;
import io.qameta.allure.Feature; import io.qameta.allure.Feature;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.msa.ui.pages.ProfilesPageElements; import org.thingsboard.server.msa.ui.pages.ProfilesPageElements;
@ -33,6 +34,16 @@ import static org.thingsboard.server.msa.ui.utils.Const.SAME_NAME_WARNING_DEVICE
@Feature("Create device") @Feature("Create device")
public class CreateDeviceTest extends AbstractDeviceTest { public class CreateDeviceTest extends AbstractDeviceTest {
@AfterMethod
public void delete() {
deleteDeviceByName(deviceName);
deviceName = null;
if (deviceProfileTitle != null) {
deleteDeviceProfileByTitle(deviceProfileTitle);
deviceProfileTitle = null;
}
}
@Test(groups = "smoke") @Test(groups = "smoke")
@Description("Add device after specifying the name (text/numbers /special characters)") @Description("Add device after specifying the name (text/numbers /special characters)")
public void createDevice() { public void createDevice() {

View File

@ -0,0 +1,105 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.msa.ui.tests.devicessmoke;
import io.qameta.allure.Description;
import io.qameta.allure.Epic;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.msa.ui.utils.DataProviderCredential;
import org.thingsboard.server.msa.ui.utils.EntityPrototypes;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.thingsboard.server.msa.ui.base.AbstractBasePage.random;
import static org.thingsboard.server.msa.ui.utils.Const.ENTITY_NAME;
@Epic("Filter devices (By device profile and state)")
public class DeviceFilterTest extends AbstractDeviceTest {
private String activeDeviceName;
private String deviceWithProfileName;
private String activeDeviceWithProfileName;
@BeforeClass
public void createTestEntities() {
DeviceProfile deviceProfile = testRestClient.postDeviceProfile(EntityPrototypes.defaultDeviceProfile(ENTITY_NAME + random()));
Device deviceWithProfile = testRestClient.postDevice("", EntityPrototypes.defaultDevicePrototype(ENTITY_NAME + random(), deviceProfile.getId()));
Device activeDevice = testRestClient.postDevice("", EntityPrototypes.defaultDevicePrototype(ENTITY_NAME + random()));
Device activeDeviceWithProfile = testRestClient.postDevice("", EntityPrototypes.defaultDevicePrototype(ENTITY_NAME + random(), deviceProfile.getId()));
DeviceCredentials deviceCredentials = testRestClient.getDeviceCredentialsByDeviceId(activeDevice.getId());
DeviceCredentials deviceCredentials1 = testRestClient.getDeviceCredentialsByDeviceId(activeDeviceWithProfile.getId());
testRestClient.postTelemetry(deviceCredentials.getCredentialsId(), JacksonUtil.toJsonNode(createPayload().toString()));
testRestClient.postTelemetry(deviceCredentials1.getCredentialsId(), JacksonUtil.toJsonNode(createPayload().toString()));
deviceProfileTitle = deviceProfile.getName();
deviceWithProfileName = deviceWithProfile.getName();
activeDeviceName = activeDevice.getName();
activeDeviceWithProfileName = activeDeviceWithProfile.getName();
}
@AfterClass
public void deleteTestEntities() {
deleteDevicesByName(List.of(deviceWithProfileName, activeDeviceName, activeDeviceWithProfileName));
deleteDeviceProfileByTitle(deviceProfileTitle);
}
@Test(groups = "smoke")
@Description("Filter by device profile")
public void filterDevicesByProfile() {
sideBarMenuView.goToDevicesPage();
devicePage.filterBtn().click();
devicePage.filterDeviceByDeviceProfile(deviceProfileTitle);
devicePage.listOfDevicesProfile().forEach(d -> assertThat(d.getText())
.as("There are only devices with the selected profile(%s) on the page", deviceProfileTitle)
.isEqualTo(deviceProfileTitle));
}
@Test(groups = "smoke", dataProviderClass = DataProviderCredential.class, dataProvider = "filterData")
@Description("Filter by state")
public void filterDevicesByState(String state) {
sideBarMenuView.goToDevicesPage();
devicePage.filterBtn().click();
devicePage.filterDeviceByState(state);
devicePage.listOfDevicesState().forEach(d -> assertThat(d.getText())
.as("There are only devices with '%s' state on the page", state)
.isEqualTo(state));
}
@Test(groups = "smoke", dataProviderClass = DataProviderCredential.class, dataProvider = "filterData")
@Description("Filter device by device profile and state")
public void filterDevicesByDeviceProfileAndState(String state) {
sideBarMenuView.goToDevicesPage();
devicePage.filterBtn().click();
devicePage.filterDeviceByDeviceProfileAndState(deviceProfileTitle, state);
devicePage.listOfDevicesProfile().forEach(d -> assertThat(d.getText())
.as("There are only devices with the selected profile(%s) on the page", deviceProfileTitle)
.isEqualTo(deviceProfileTitle));
devicePage.listOfDevicesState().forEach(d -> assertThat(d.getText())
.as("There are only devices with '%s' state on the page", state)
.isEqualTo(state));
}
}

View File

@ -45,4 +45,6 @@ public class Const {
public static final String PHONE_NUMBER_ERROR_MESSAGE = "Phone number is invalid or not possible"; public static final String PHONE_NUMBER_ERROR_MESSAGE = "Phone number is invalid or not possible";
public static final String NAME_IS_REQUIRED_MESSAGE = "Name is required."; public static final String NAME_IS_REQUIRED_MESSAGE = "Name is required.";
public static final String DEVICE_PROFILE_IS_REQUIRED_MESSAGE = "Device profile is required"; public static final String DEVICE_PROFILE_IS_REQUIRED_MESSAGE = "Device profile is required";
public static final String DEVICE_ACTIVE_STATE = "Active";
public static final String DEVICE_INACTIVE_STATE = "Inactive";
} }

View File

@ -21,6 +21,8 @@ import org.testng.annotations.DataProvider;
import static org.thingsboard.server.msa.ui.base.AbstractBasePage.getRandomNumber; import static org.thingsboard.server.msa.ui.base.AbstractBasePage.getRandomNumber;
import static org.thingsboard.server.msa.ui.base.AbstractBasePage.getRandomSymbol; import static org.thingsboard.server.msa.ui.base.AbstractBasePage.getRandomSymbol;
import static org.thingsboard.server.msa.ui.base.AbstractBasePage.random; import static org.thingsboard.server.msa.ui.base.AbstractBasePage.random;
import static org.thingsboard.server.msa.ui.utils.Const.DEVICE_ACTIVE_STATE;
import static org.thingsboard.server.msa.ui.utils.Const.DEVICE_INACTIVE_STATE;
import static org.thingsboard.server.msa.ui.utils.Const.ENTITY_NAME; import static org.thingsboard.server.msa.ui.utils.Const.ENTITY_NAME;
public class DataProviderCredential { public class DataProviderCredential {
@ -166,4 +168,12 @@ public class DataProviderCredential {
{label, newLabel, label + newLabel}, {label, newLabel, label + newLabel},
{label, Keys.CONTROL + "A" + Keys.BACK_SPACE, ""}}; {label, Keys.CONTROL + "A" + Keys.BACK_SPACE, ""}};
} }
@DataProvider(name = "filterData")
public static Object[][] getFilterData() {
return new Object[][]{
{DEVICE_ACTIVE_STATE},
{DEVICE_INACTIVE_STATE}
};
}
} }

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTra
import org.thingsboard.server.common.data.device.profile.DeviceProfileData; import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration; import org.thingsboard.server.common.data.device.profile.DisabledDeviceProfileProvisionConfiguration;
import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChain;
@ -227,6 +228,14 @@ public class EntityPrototypes {
return device; return device;
} }
public static Device defaultDevicePrototype(String name, DeviceProfileId deviceProfileId) {
Device device = new Device();
device.setName(name + RandomStringUtils.randomAlphanumeric(7));
device.setType("DEFAULT");
device.setDeviceProfileId(deviceProfileId);
return device;
}
public static Asset defaultAssetPrototype(String name, CustomerId id) { public static Asset defaultAssetPrototype(String name, CustomerId id) {
Asset asset = new Asset(); Asset asset = new Asset();
asset.setName(name + RandomStringUtils.randomAlphanumeric(7)); asset.setName(name + RandomStringUtils.randomAlphanumeric(7));

View File

@ -192,6 +192,13 @@ metrics:
# Metrics percentiles returned by actuator for timer metrics. List of double values (divided by ,). # Metrics percentiles returned by actuator for timer metrics. List of double values (divided by ,).
percentiles: "${METRICS_TIMER_PERCENTILES:0.5}" percentiles: "${METRICS_TIMER_PERCENTILES:0.5}"
management:
endpoints:
web:
exposure:
# Expose metrics endpoint (use value 'prometheus' to enable prometheus metrics).
include: '${METRICS_ENDPOINTS_EXPOSE:info}'
service: service:
type: "${TB_SERVICE_TYPE:tb-vc-executor}" type: "${TB_SERVICE_TYPE:tb-vc-executor}"
# Unique id for this service (autogenerated if empty) # Unique id for this service (autogenerated if empty)

View File

@ -35,7 +35,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
@Slf4j
final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> { final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage> {
private final MqttClientImpl client; private final MqttClientImpl client;
@ -48,6 +50,7 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
if (msg.decoderResult().isSuccess()) {
switch (msg.fixedHeader().messageType()) { switch (msg.fixedHeader().messageType()) {
case CONNACK: case CONNACK:
handleConack(ctx.channel(), (MqttConnAckMessage) msg); handleConack(ctx.channel(), (MqttConnAckMessage) msg);
@ -74,6 +77,10 @@ final class MqttChannelHandler extends SimpleChannelInboundHandler<MqttMessage>
handlePubcomp(msg); handlePubcomp(msg);
break; break;
} }
} else {
log.error("[{}] Message decoding failed: {}", client.getClientConfig().getClientId(), msg.decoderResult().cause().getMessage());
ctx.close();
}
} }
@Override @Override