diff --git a/application/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/application/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
new file mode 100644
index 0000000000..6afc99c69d
--- /dev/null
+++ b/application/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Content of this file was modified to addresses the issue https://issues.apache.org/jira/browse/KAFKA-4090
+ *
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.memory.MemoryPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
+ */
+public class NetworkReceive implements Receive {
+
+    public final static String UNKNOWN_SOURCE = "";
+    public final static int UNLIMITED = -1;
+    public final static int TB_MAX_REQUESTED_BUFFER_SIZE = 100 * 1024 * 1024;
+    public final static int TB_LOG_REQUESTED_BUFFER_SIZE = 10 * 1024 * 1024;
+    private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+    private final String source;
+    private final ByteBuffer size;
+    private final int maxSize;
+    private final MemoryPool memoryPool;
+    private int requestedBufferSize = -1;
+    private ByteBuffer buffer;
+
+
+    public NetworkReceive(String source, ByteBuffer buffer) {
+        this.source = source;
+        this.buffer = buffer;
+        this.size = null;
+        this.maxSize = TB_MAX_REQUESTED_BUFFER_SIZE;
+        this.memoryPool = MemoryPool.NONE;
+    }
+
+    public NetworkReceive(String source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = TB_MAX_REQUESTED_BUFFER_SIZE;
+        this.memoryPool = MemoryPool.NONE;
+    }
+
+    public NetworkReceive(int maxSize, String source) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = getMaxSize(maxSize);
+        this.memoryPool = MemoryPool.NONE;
+    }
+
+    public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = getMaxSize(maxSize);
+        this.memoryPool = memoryPool;
+    }
+
+    public NetworkReceive() {
+        this(UNKNOWN_SOURCE);
+    }
+
+    @Override
+    public String source() {
+        return source;
+    }
+
+    @Override
+    public boolean complete() {
+        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
+    }
+
+    public long readFrom(ScatteringByteChannel channel) throws IOException {
+        int read = 0;
+        if (size.hasRemaining()) {
+            int bytesRead = channel.read(size);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+            if (!size.hasRemaining()) {
+                size.rewind();
+                int receiveSize = size.getInt();
+                if (receiveSize < 0)
+                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
+                if (maxSize != UNLIMITED && receiveSize > maxSize) {
+                    var t = Thread.currentThread();
+                    throw new ThingsboardKafkaClientError("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
+                }
+                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
+                if (receiveSize == 0) {
+                    buffer = EMPTY_BUFFER;
+                }
+            }
+        }
+        if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
+            if (requestedBufferSize > TB_LOG_REQUESTED_BUFFER_SIZE) {
+                String stackTrace = Arrays.stream(Thread.currentThread().getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("|"));
+                log.error("Allocating buffer of size {} for source {}", requestedBufferSize, source);
+                log.error("Stack Trace: {}", stackTrace);
+            }
+            buffer = memoryPool.tryAllocate(requestedBufferSize);
+            if (buffer == null)
+                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
+        }
+        if (buffer != null) {
+            int bytesRead = channel.read(buffer);
+            if (bytesRead < 0)
+                throw new EOFException();
+            read += bytesRead;
+        }
+
+        return read;
+    }
+
+    @Override
+    public boolean requiredMemoryAmountKnown() {
+        return requestedBufferSize != -1;
+    }
+
+    @Override
+    public boolean memoryAllocated() {
+        return buffer != null;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (buffer != null && buffer != EMPTY_BUFFER) {
+            memoryPool.release(buffer);
+            buffer = null;
+        }
+    }
+
+    public ByteBuffer payload() {
+        return this.buffer;
+    }
+
+    public int bytesRead() {
+        if (buffer == null)
+            return size.position();
+        return buffer.position() + size.position();
+    }
+
+    /**
+     * Returns the total size of the receive including payload and size buffer
+     * for use in metrics. This is consistent with {@link NetworkSend#size()}
+     */
+    public int size() {
+        return payload().limit() + size.limit();
+    }
+
+    private int getMaxSize(int maxSize) {
+        return maxSize == UNLIMITED ? TB_MAX_REQUESTED_BUFFER_SIZE : Math.min(maxSize, TB_MAX_REQUESTED_BUFFER_SIZE);
+    }
+
+}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/exception/ThingsboardKafkaClientError.java b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ThingsboardKafkaClientError.java
new file mode 100644
index 0000000000..da54431742
--- /dev/null
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/exception/ThingsboardKafkaClientError.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright © 2016-2022 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.common.data.exception;
+
+public class ThingsboardKafkaClientError extends Error {
+
+    public ThingsboardKafkaClientError(String message) {
+        super(message);
+    }
+}
diff --git a/pom.xml b/pom.xml
index 3ba9a0fd28..d572580132 100755
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,8 @@
         
         5.0.2
         0.2.1
+        
         2.8.0
         4.1.1
         2.57
@@ -807,6 +809,7 @@
                             **/*.patch
                             **/apache/cassandra/io/**
                             .run/**
+                            **/NetworkReceive.java
                         
                         
                             JAVADOC_STYLE
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
index 65c05a8dc3..5acf944c98 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/kafka/TbKafkaNode.java
@@ -23,10 +23,12 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.springframework.util.ReflectionUtils;
 import org.thingsboard.rule.engine.api.RuleNode;
 import org.thingsboard.rule.engine.api.TbContext;
 import org.thingsboard.rule.engine.api.TbNode;
@@ -34,10 +36,12 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 import org.thingsboard.rule.engine.api.TbNodeException;
 import org.thingsboard.rule.engine.api.TbRelationTypes;
 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
+import org.thingsboard.server.common.data.exception.ThingsboardKafkaClientError;
 import org.thingsboard.server.common.data.plugin.ComponentType;
 import org.thingsboard.server.common.msg.TbMsg;
 import org.thingsboard.server.common.msg.TbMsgMetaData;
 
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
@@ -62,16 +66,23 @@ public class TbKafkaNode implements TbNode {
     private static final String TOPIC = "topic";
     private static final String ERROR = "error";
     public static final String TB_MSG_MD_PREFIX = "tb_msg_md_";
+    private static final Field IO_THREAD_FIELD = ReflectionUtils.findField(KafkaProducer.class, "ioThread");
+
+    static {
+        IO_THREAD_FIELD.setAccessible(true);
+    }
 
     private TbKafkaNodeConfiguration config;
     private boolean addMetadataKeyValuesAsKafkaHeaders;
     private Charset toBytesCharset;
 
     private Producer, String> producer;
+    private Throwable initError;
 
     @Override
     public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
         this.config = TbNodeUtils.convert(configuration, TbKafkaNodeConfiguration.class);
+        this.initError = null;
         Properties properties = new Properties();
         properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-tb-kafka-node-" + ctx.getSelfId().getId().toString() + "-" + ctx.getServiceId());
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
@@ -89,6 +100,13 @@ public class TbKafkaNode implements TbNode {
         toBytesCharset = config.getKafkaHeadersCharset() != null ? Charset.forName(config.getKafkaHeadersCharset()) : StandardCharsets.UTF_8;
         try {
             this.producer = new KafkaProducer<>(properties);
+            Thread ioThread = (Thread) ReflectionUtils.getField(IO_THREAD_FIELD, producer);
+            ioThread.setUncaughtExceptionHandler((thread, throwable) -> {
+                if (throwable instanceof ThingsboardKafkaClientError) {
+                    initError = throwable;
+                    destroy();
+                }
+            });
         } catch (Exception e) {
             throw new TbNodeException(e);
         }
@@ -98,10 +116,14 @@ public class TbKafkaNode implements TbNode {
     public void onMsg(TbContext ctx, TbMsg msg) {
         String topic = TbNodeUtils.processPattern(config.getTopicPattern(), msg);
         try {
-            ctx.getExternalCallExecutor().executeAsync(() -> {
-                publish(ctx, msg, topic);
-                return null;
-            });
+            if (initError != null) {
+                ctx.tellFailure(msg, new RuntimeException("Failed to initialize Kafka rule node producer: " + initError.getMessage()));
+            } else {
+                ctx.getExternalCallExecutor().executeAsync(() -> {
+                    publish(ctx, msg, topic);
+                    return null;
+                });
+            }
         } catch (Exception e) {
             ctx.tellFailure(msg, e);
         }