0

I'm doing a small project in which I'm trying to integrate Kafka inside a node.js application. I'm running all the code inside a container on M1 mac machine. Sadly, I'm unable to make the npm package kafkajs work. Whenever I try to run the project I get the following error

KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 172.31.0.3:9092
    at Socket.onError (/src/node_modules/kafkajs/src/network/connection.js:149:23)
    at Socket.emit (events.js:197:13)
    at emitErrorNT (internal/streams/destroy.js:82:8)
    at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
    at processTicksAndRejections (internal/process/next_tick.js:76:17)
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  originalError:
   { KafkaJSConnectionError: Connection error: connect ECONNREFUSED 172.31.0.3:9092
       at Socket.onError (/src/node_modules/kafkajs/src/network/connection.js:149:23)
       at Socket.emit (events.js:197:13)
       at emitErrorNT (internal/streams/destroy.js:82:8)
       at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
       at processTicksAndRejections (internal/process/next_tick.js:76:17)
     name: 'KafkaJSConnectionError',
     retriable: true,
     helpUrl: undefined,
     broker: 'kafka:9092',
     code: 'ECONNREFUSED' },
  retryCount: 5,
  retryTime: 8690

The Docker file to run app is

ARG NODE_VERSION=11.8.0
FROM node:$NODE_VERSION-alpine as build

WORKDIR /src/
COPY package*.json .
RUN npm ci
COPY . .
CMD ["npm","start"]

I have to run the code in this version only as ultimately I have to introduce kafka in a legacy service - That is also one limitation as to why I have to use the confluentinc image of Kafka.

My docker-compose file is:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    hostname: zookeeper
    platform: linux/x86_64
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    hostname: kafka
    platform: linux/x86_64
    environment:
      - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"

  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    hostname: kafdrop
    environment:
      - "KAFKA_BROKERCONNECT=kafka:29092"
      - "JVM_OPTS=-Xms32M -Xmx64M"
      - "SERVER_SERVLET_CONTEXTPATH=/"
    depends_on:
      - kafka
    ports:
      - "9000:9000"
  
  main:
    build: .
    restart: on-failure
    volumes:
      - .:/src:cached
      - /src/node_modules
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8080:8080"

The dockerignore file

./node_modules
Dockerfile
.dockerignore
docker-compose.yml

The Nodejs application is currently a single file which runs an app express.

const express = require('express');
const { Kafka } = require('kafkajs')

const app = express();

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: [
    'kafka:9092'
  ],
})

var adminObjFactory = async () => {
  const admin = kafka.admin()
  await admin.connect()
  return {
    topics: async () => {
      const topicLists = await admin.listTopics()
      console.log(`Lists of topics ${topicLists}`);
    },
    disconnect: async () => {
      await admin.disconnect()
    }
  }
}

const boot = async () => {
  const adminObj = await adminObjFactory();
}

boot().catch(err => console.log(err));


app.get('/', (req, res) => {
  res.send({
    'key': 'success'
  })
})


app.listen(8080, () => {
  console.log(`Example app listening on port 8080`)
})

The package.json file

{
  "name": "kafka-demo",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "start": "node-dev ./admin.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "express": "^4.18.1",
    "kafkajs": "^1.16.0",
    "node-dev": "^7.4.3"
  }
}

I am able to see the Kafka broker up and running inside the Kafkadrop application

Initially the application will crash when containers start since Kafka brokers are not up at the point but because of attached volumes, I'm able to live reload after kafka broker containers are up.

  • Remove the advertised hostname (that property is deprecated). You've set `PLAINTEXT://kafka:29092` as the advertised listener, so you need to use port 29092 between containers, just like you did in kafdrop. It'd be best if you added an env var for the brokers to your node code like kafdrop, too – OneCricketeer May 27 '22 at 12:17

0 Answers0