Fixes to Kafka JS headers issue
This commit is contained in:
		
							parent
							
								
									beb5d1d0ba
								
							
						
					
					
						commit
						30a2d19d2d
					
				@ -59,7 +59,7 @@ JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function(message) {
 | 
				
			|||||||
        } else if (request.releaseRequest) {
 | 
					        } else if (request.releaseRequest) {
 | 
				
			||||||
            this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest);
 | 
					            this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            logger.error('[%s] Unknown request recevied!', requestId);
 | 
					            logger.error('[%s] Unknown request received!', requestId);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    } catch (err) {
 | 
					    } catch (err) {
 | 
				
			||||||
 | 
				
			|||||||
@ -41,8 +41,6 @@ function KafkaProducer() {
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let headersData = headers.data;
 | 
					 | 
				
			||||||
        headersData = Object.fromEntries(Object.entries(headersData).map(([key, value]) => [key, Buffer.from(value)]));
 | 
					 | 
				
			||||||
        return producer.send(
 | 
					        return producer.send(
 | 
				
			||||||
            {
 | 
					            {
 | 
				
			||||||
                topic: responseTopic,
 | 
					                topic: responseTopic,
 | 
				
			||||||
@ -50,7 +48,7 @@ function KafkaProducer() {
 | 
				
			|||||||
                    {
 | 
					                    {
 | 
				
			||||||
                        key: scriptId,
 | 
					                        key: scriptId,
 | 
				
			||||||
                        value: rawResponse,
 | 
					                        value: rawResponse,
 | 
				
			||||||
                        headers: headersData
 | 
					                        headers: headers.data
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                ]
 | 
					                ]
 | 
				
			||||||
            });
 | 
					            });
 | 
				
			||||||
@ -96,15 +94,10 @@ function KafkaProducer() {
 | 
				
			|||||||
            eachMessage: async ({topic, partition, message}) => {
 | 
					            eachMessage: async ({topic, partition, message}) => {
 | 
				
			||||||
                let headers = message.headers;
 | 
					                let headers = message.headers;
 | 
				
			||||||
                let key = message.key;
 | 
					                let key = message.key;
 | 
				
			||||||
                let data = message.value;
 | 
					 | 
				
			||||||
                let msg = {};
 | 
					                let msg = {};
 | 
				
			||||||
 | 
					 | 
				
			||||||
                headers = Object.fromEntries(
 | 
					 | 
				
			||||||
                    Object.entries(headers).map(([key, value]) => [key, [...value]]));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                msg.key = key.toString('utf8');
 | 
					                msg.key = key.toString('utf8');
 | 
				
			||||||
                msg.data = [...data];
 | 
					                msg.data = message.value;
 | 
				
			||||||
                msg.headers = {data: headers}
 | 
					                msg.headers = {data: headers};
 | 
				
			||||||
                messageProcessor.onJsInvokeMessage(msg);
 | 
					                messageProcessor.onJsInvokeMessage(msg);
 | 
				
			||||||
            },
 | 
					            },
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user