Kafka integration seems broken

Trying to get the Kafka integration working with chirpstack running in Docker, a synthetic device, and Kafka broker running in Docker.

I have the synthetic device talking to Chirpstack in Docker and a HTTP integration working so I know the first part is good.

I verified that I can post and consume messages to the Kafka broker from the command line.

I wanted to enable a Kafka integration. However there is no Integration section for this on the Applications/Integration page. So I simply enabled the Kafka integration in the Chirpstack Application Server toml file.

But when run nothing seems to get to the broker.

However the Chirpstack Application Server log shows that a call is being made to Kafka but the connection is being refused:
time=“2021-11-11T04:21:10.6757893Z” level=error msg=“integration/multi: integration error” ctx_id=8a9d5165-9481-458e-9b65-70aa1c4cfc78 error=“writing message to kafka: dial tcp 127.0.0.1:9093: connect: connection refused” integration="*kafka.Integration"

So presumably the Integration is enabled, though possibly not correctly.

The Kafka log says it has the following error:
kafka Invalid receive (size = 1195725856 larger than 104857600)

which stackoverflow suggests that 1195725856 is GET[space] encoded as a big-endian - implying http data is being sent to the Kafka broker.

I then removed the http integration in case only one integration could be active at one time. But the issue remains.

Googled extensively but seems it works for majority of people so presumably I’m making a neophyte mistake.

Can anyone suggest what setup step I’ve missed? Or point me to a good writeup on how to get the Kafka integration going.

From the logs it looks like it’s using localhost (127.0.0.1) to reach Kafka, if it’s running inside a container it should try to reach kafka outside of itself. Mind posting the snippet of the configuration without the credential details please?

Thanks for the quick response.

Both Chirpstack and Kafka are running in containers on my local machine. So I used 127.0.0.1 as the address, and your comment made me realise one of my mistakes, there is still at least one more, as I was bitten by that when getting the http integration going.

So I checked, using the command line kafka producer/consumer shell scripts, that using my machine IP address would work and it did. So I changed the broker address to my local machines IP address. That changed the chirpstack

error. Now the error is simply:

time="2021-11-11T18:32:43.8516891Z" level=error msg="integration/multi: integration error" ctx_id=8a3b33ce-26f8-450f-bcdf-eb10c66ac886 error="writing message to kafka: kafka write errors (1/1)" integration="*kafka.Integration"

So it looks like it may be connecting but something else goes wrong.

The Kafka log shows nothing at that point in time. Though I don’t seem to be able to change the logging level despite adding the environment and rebuilding the docker image.

My chirpstack-application-server.toml section that adds the Kafka integration is below. XXX.YYY.Z.B is now my machines IP address:

    enabled=["mqtt", "kafka"]

    # Kafka integration.
    [application_server.integration.kafka]
    # Brokers, e.g.: localhost:9092.
    brokers=["XXX.YYY.Z.B:9093"]

    # TLS.
    #
    # Set this to true when the Kafka client must connect using TLS to the Broker.
    tls=false

    # Topic for events.
    topic="chirpstack_as"

    # Template for keys included in Kafka messages. If empty, no key is included.
    # Kafka uses the key for distributing messages over partitions. You can use
    # this to ensure some subset of messages end up in the same partition, so
    # they can be consumed in-order. And Kafka can use the key for data retention
    # decisions.  A header "event" with the event type is included in each
    # message. There is no need to parse it from the key.
    event_key_template="application.{{ .ApplicationID }}.device.{{ .DevEUI }}.event.{{ .EventType }}"

    # Username (optional).
    username=""

    # Password (optional).
    password=""

    # One of plain or scram
    mechanism="plain"
    # Only used if mechanism == scram.
    # SHA-256 or SHA-512
    algorithm="SHA-512"

In case it is relevant my docker-compose.yml file for the Kafka container is:

    version: "2"

    services:
      zookeeper:
        image: docker.io/bitnami/zookeeper:3.7
        ports:
          - "2181:2181"
        volumes:
          - "zookeeper_data:/bitnami"
        environment:
          - ALLOW_ANONYMOUS_LOGIN=yes
      kafka:
        image: docker.io/bitnami/kafka:3
        ports:
          - "9093:9093"
        volumes:
          - "kafka_data:/bitnami"
        environment:
          - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
          - ALLOW_PLAINTEXT_LISTENER=yes
          - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
          - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
          - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
          - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
          - KAFKA_LOG4J_ROOT_LOGLEVEL=DEBUG
          - KAFKA_LOG4J_LOGGERS="kafka.controller=DEBUG"

        depends_on:
          - zookeeper

    volumes:
      zookeeper_data:
        driver: local
      kafka_data:
        driver: local

Found my mistake. The line in the Kafka compose file

Was also using localhost. It all starts working if I used my device ip address instead of localhost.

I’m unsure why my command line test worked, but now the kafka integration my care factor is 0.