HTTP integration with Confluent Kafka API

Greetings!

Our department has been building out a LoRaWAN infrastructure ChirpStack and we also have a Kafka infrastructure for ingesting our data into research processes and experiments. The HTTP integration should work except we have the Confluent REST API and HAProxy running in front of our clusters. The HAProxy doesn’t seem to be a problem. The Confluent Kafka REST API appears to have some requirements to the JSON format, though. We’ve tested with hookbin and others (as suggested in other forum posts) to ensure the JSON was posting succesfully, but with the Confluent Kafka REST API, it would aways fail. We captured the JSON from the MQTT stream and ran curl commands with no success always returning a 422 error with “Unrecognized field: applicationID”. We manually reformatted the JSON to be wrapped by '{“records”:[{“value:” …}] and the POST was accepted.

Here are the before and after commands. (internal data removed)

Before:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"applicationID":"4","applicationName":"<app_name>","deviceName":"dev_name","devEUI":"dev_eui","rxInfo":[{"gatewayID":gateway_ID","uplinkID":"a03f1db5-c2f6-40f0-9602-800c6df2c742","name":"gateway_name","rssi":-55,"loRaSNR":13.5,"location":{"latitude":<lat>,"longitude":<long>,"altitude":137}}],"tx
Info":{"frequency":904900000,"dr":0},"adr":true,"fCnt":8515,"fPort":1,"data":"AWf/VQJolQMBBA==","object":{"digitalOutput":{"3":4},"temperatureSensor":{"1":-17.1},"humiditySensor":{"2":74.5
}}}' "http://<kafka_url>:8082/topics/<topic_name"

After:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{"applicationID":"4","applicationName":"<app_name>","deviceName":"dev_name","devEUI":"dev_eui","rxInfo":[{"gatewayID":gateway_ID","uplinkID":"a03f1db5-c2f6-40f0-9602-800c6df2c742","name":"gateway_name","rssi":-55,"loRaSNR":13.5,"location":{"latitude":<lat>,"longitude":<long>,"altitude":137}}],"tx
Info":{"frequency":904900000,"dr":0},"adr":true,"fCnt":8515,"fPort":1,"data":"AWf/VQJolQMBBA==","object":{"digitalOutput":{"3":4},"temperatureSensor":{"1":-17.1},"humiditySensor":{"2":74.5
}}}}]' "http://<kafka_url>:8082/topics/<topic_name>"

Is there a way to alter the JSON in the HTTP integration to be wrapped by the missing required “records” : “value” pair? Being able to do it in the HTTP integration would save a world of scripting and alterations.

Thanks in advance!