I'm doing a small project in which I'm trying to integrate Kafka inside a node.js application. I'm running all the code inside a container on M1 mac machine. Sadly, I'm unable to make the npm package kafkajs work. Whenever I try to run the project I get the following error
KafkaJSNonRetriableError
Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 172.31.0.3:9092
at Socket.onError (/src/node_modules/kafkajs/src/network/connection.js:149:23)
at Socket.emit (events.js:197:13)
at emitErrorNT (internal/streams/destroy.js:82:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
originalError:
{ KafkaJSConnectionError: Connection error: connect ECONNREFUSED 172.31.0.3:9092
at Socket.onError (/src/node_modules/kafkajs/src/network/connection.js:149:23)
at Socket.emit (events.js:197:13)
at emitErrorNT (internal/streams/destroy.js:82:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
name: 'KafkaJSConnectionError',
retriable: true,
helpUrl: undefined,
broker: 'kafka:9092',
code: 'ECONNREFUSED' },
retryCount: 5,
retryTime: 8690
The Docker file to run app is
ARG NODE_VERSION=11.8.0
FROM node:$NODE_VERSION-alpine as build
WORKDIR /src/
COPY package*.json .
RUN npm ci
COPY . .
CMD ["npm","start"]
I have to run the code in this version only as ultimately I have to introduce kafka in a legacy service - That is also one limitation as to why I have to use the confluentinc image of Kafka.
My docker-compose file is:
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
hostname: zookeeper
platform: linux/x86_64
environment:
- ZOOKEEPER_CLIENT_PORT=2181
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
hostname: kafka
platform: linux/x86_64
environment:
- KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
ports:
- "9092:9092"
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
hostname: kafdrop
environment:
- "KAFKA_BROKERCONNECT=kafka:29092"
- "JVM_OPTS=-Xms32M -Xmx64M"
- "SERVER_SERVLET_CONTEXTPATH=/"
depends_on:
- kafka
ports:
- "9000:9000"
main:
build: .
restart: on-failure
volumes:
- .:/src:cached
- /src/node_modules
depends_on:
- zookeeper
- kafka
ports:
- "8080:8080"
The dockerignore file
./node_modules
Dockerfile
.dockerignore
docker-compose.yml
The Nodejs application is currently a single file which runs an app express.
const express = require('express');
const { Kafka } = require('kafkajs')
const app = express();
const kafka = new Kafka({
clientId: 'my-app',
brokers: [
'kafka:9092'
],
})
var adminObjFactory = async () => {
const admin = kafka.admin()
await admin.connect()
return {
topics: async () => {
const topicLists = await admin.listTopics()
console.log(`Lists of topics ${topicLists}`);
},
disconnect: async () => {
await admin.disconnect()
}
}
}
const boot = async () => {
const adminObj = await adminObjFactory();
}
boot().catch(err => console.log(err));
app.get('/', (req, res) => {
res.send({
'key': 'success'
})
})
app.listen(8080, () => {
console.log(`Example app listening on port 8080`)
})
The package.json file
{
"name": "kafka-demo",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"start": "node-dev ./admin.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"express": "^4.18.1",
"kafkajs": "^1.16.0",
"node-dev": "^7.4.3"
}
}
I am able to see the Kafka broker up and running inside the Kafkadrop application
Initially the application will crash when containers start since Kafka brokers are not up at the point but because of attached volumes, I'm able to live reload after kafka broker containers are up.