MQTT APPLICATION Integration

I know there’s a global MQTT integration point in ChirpStack, but I’m wondering if there happens to be any way to perform organisation-specific or application-specific MQTT integration - either in ChirpStack, or by bridging Mosquito with 3rd party services?

My ChirpStack has multiple Organisations, each of which have a number of Applications.

Each of these organisations have their own hosted services and would like to use their providers matching IoT infrastructure (ie. AWS IoT, Google IoT Core + Pub/Sub, Azure IoT etc.)

I’d like to route the various MQTT messages produced by the LoRaWAN devices to those endpoints. So all the messages for customer A are sent to devices on customer A’s AWS IoT account, all customer Bs messages sent out to Google Core IoT etc.

Is there any way of doing this using the ChirpStack infrastructure? Can I create my own application integration point (ideally without using the HTTP protocol method?) Or do I have to write a Python script to connect to my MQTT broker or REDIS server to do this all myself?

1 Like

With the MQTT integration you can subscribe to a specific application. See the documentation However, you will have to build you own pipeline to each individual IoT PaaS (Google, AWS, IoT). For example:

mosquitto_sub -t "application/123/#" -v  ---> consumer.py --> AWS IoT

Yeah - that’s more or less the answer I was expecting. Shame - I was hoping that the ChirpStack infrastructure would let you make your own integration points through the API/launching local scripts, rather than being limited to only three options.

I’ve installed Node_Red to handle the MQTT->Provider transfer. It seems to be a pretty powerful option with a lot of flexibility.

No ETA, but something like this will be coming. Please note that this is not going to be as efficient as a global MQTT integration. A global integration connects on starting the service and stays connected. Per application integrations will connect and disconnect as soon as the event is handled. Without disconnecting, you might end up with many open connects.

2 Likes

Understand the limitations - although a user-configurable size of connection-pool would be rather nice. That way if I only have a couple of connections (or I have one or two used heavily rather than transiently), I’d get efficient connection patterns up until I hit the pool limits.

Glad to know it’s on its way though - please feel free to let me know when it hits!

I’m interested to learn how you would see this work :slight_smile:

Imagine the following:

  • 10 servers with ChirpStack Application Server running
  • 10 organizations
  • 10 applications per organization with MQTT integration
  • (thus in total 100 applications with MQTT integration)

If you want to keep 1 connection open per application MQTT integration, then this would be 100 MQTT connections per server, thus 1000 MQTT connections in total.

I don’t think you can pool these, as potentially each MQTT integration has different credentials.

1 Like

I’m using “Connection Pooling” incorrectly - it’s more a “Connection Cache”. I’m basically referring to a system that would keep a user-configurable number of MQTT connections on standby in an FIFO queue rather than always disconnecting at the end of the integration relaying a message.

On large, balanced systems (ie when you have every application with the same amount of MQTT traffic - such as the one you describe) it would likely have no benefit at all.

But on smaller or highly unbalanced applications (where you have a small number of customers, or only a small number of your customers have an MQTT integration, or you have a customer with much larger application than most of the others on the shared system), this may dramatically decrease the number of disconnection/reconnection attempts.

Let’s say you have 10 orgs, with 10 applications like in your example, and a cache of 10 connections.

If they all had the same number of devices with the same polling rates distributed over time evenly, there would be no real benefit - the connections would never get re-used.

But let’s say one application is 10x the size, and an update rate of once per minute rather than once per hour. That means that it’s using 600-times the MQTT updates than it’s neighbours.

By caching the connection, you’d have a reasonable expectation of saving yourself literally thousands of reconnect/disconnect attempts a day. And if you make your caching system smart (using weighted-average or long-term-trend of usage rather than a simple FIFO queue), you’d save considerably more.

You’d also be more prepared for any traffic that tends to ‘burst update’ - if you’re using wired Class C LoRaWAN sensors, they might all report their hourly status at around the same time, as they were powered up around the same time. Using the cache, you’d be able to handle the burst of data without constant disconnection/reconnection, then disconnect comfortably once the burst has subsided and connections from other integrations fill the cache.

1 Like

And just to confirm, the idea would be a global connection pool, or at worst a per-org pool. You’d be unlikely to go down to the individual application level.

I feel like this would be better handled at the infrastructure level than the application level. I.e. If you are in a situation with such lopsided tenancy concerns, don’t colocate them.

But there are many ways to skin this cat, for sure.

1 Like

I tend to argue from extreme examples - it shows the potential utility of the feature.

And while you’re right in that they should ideally not be colocated, the fact that LoRaWAN applications can grow at wildly varying rates between clients can contribute to this sort of situation.

That and I’m personally looking at having one AWS LightSail instance per customer means that for single-tennant applications, this feature would be invaluable. If you made your cache equal to the number of applications you were running, it would result in no disconnection and reconnections (except for those due to timeout) for MQTT or AMQP distribution.

1 Like

@Optrix I’m still investigating if this is worth the additional complexity to make these kind of integrations available per application. For some of the integrations, there is a lot of additional overhead to connect and disconnect. E.g. for AMQP you might go from one to 14 - 19 packets (see: https://www.cloudamqp.com/blog/2018-01-19-part4-rabbitmq-13-common-errors.html). I have not validated this number, but it means almost 20x overhead in the worst-case.

I do like your proposal about the connection cache, but this adds quite some complexity. You have to keep track of the integration usage to keep the most used in the cache. But since this cache will be per server instance, there also needs to be a cache invalidation system in place. E.g. when a user changes the MQTT credentials, you want to make sure that the connection cache is purged on all servers, not only on the server handling the API request.

I’ve personally moved to using Node-Red to manage the distribution of data, and I’m bringing up per-tennant AWS instances to ensure security on the MQTT/Redis side. But I honestly think it’s a direction the system should probably head, if the intention is for the system to be multi-tennant (ie. completely different companies using the one server, rather than different departments in a single organisation).

If the expectation is for the system to be single-tennant, then of course the global integration points would likely be sufficient.

In terms of making the connection cache efficient, I’d be tempted to use the server address, plus a hash of the entire set of connection options (ie. server / port / username / password etc.) as the cache dictionary key - this way if a user changes the connection details for the integration point, all servers will then make a new integration point, and the old one will simply decay over time and disconnect gracefully.

You use the server address to help avoid hash collisions, and the hash to ensure that if ANY connection parameter is changed, new connections are created rather than outdated connections being used.

For very small servers, it’s probably worth putting a maximum age on your cached connections too - you don’t want to keep a redundant connection around for days and weeks if it’s not being used. Perhaps something that disconnects and destroys cache entries if they haven’t been used for more than an hour?

But you already could separate data completely if you want in a secure way:

Option 1

Have a central MQTT broker and integrate this with the PostgreSQL database so that users can login with their own credentials and only see their own data. In this case you might be interested in Go mosquitto auth plugin. Also I’m open to discuss how this could be made easier. For gateways I’m planning to implement automatic TLS certificate generation, which works with most MQTT brokers (e.g. you can use the CN as User ID and then create an ACL with the User ID in the topic template).

Option 2

Use the per-application HTTP integration to post data to an external endpoint. This could for example be a GCP Cloud Function, which publishes the data to GCP Pub/Sub. Same would work for other cloud providers.

2 Likes

My user-base is largely going to want to connect their LoRaWAN data to cloud IoT infrastructure - namely AWS IoT Core, Pub/Sub and Azure IoT - rather than use the local MQTT broker. But I was hoping to use a single server on our end to handle this.

My hope was that the product would let me quickly and easily bridge an orgs data to a destination - and the HTTP endpoint does allow this, but it requires someone to get in there and fiddle with the plumbing.

Now this is a bigish change, but perhaps the ideal goal is to make the integration engine pluggable? Lets say that there was a folder of configuration files where you could register your own integration endpoints, and back-end services that handled all of the communication required (these services largely written by the community, not yourself).

Chirpstack would display HTML pages from the integration plugin to allow configuration - ie. setting the AWS IoT certificates - and would store JSON-encoded form results.

When the integration services restart or get an update signal (via the back-end broker), they call an API endpoint in Chirpstack. That API gives a list of configuration options (from the form) and the associated MQTT paths to listen to (ie. the MQTT path to the application).

Then those services mind their own business pushing data out to you-don’t-care-where - all your software really has to do is keep the services informed of the configuration and point-set.

This way you could get really crazy with integrations - they could be attached globally, to an org, to an application - you could even theoretically connect one to a device-type or gateway, although I’m not really sure there’d be any call for that.

They’d also be completely configurable from the web frontend, making a seamless-looking experience, but you and the rest of the Chirpstack devs wouldn’t have to worry about maintaining dozens of different integrations as part of the core product - you stick to your core set of message brokers and perhaps a couple of your most important integrations, but the community takes care of the rest.

If all comms are through an MQTT broker, you could even run these service endpoints on remote systems, allowing users to scale-up big distribution workloads (bring up multiple machines to distribute data to AWS), or vice-versa you could have a single AWS IoT integration service handling the traffic from multiple Chirpstack instances - pretty flexible.

Please note that this usually means that you have to create the device both in ChirpStack and in for example GCP Cloud IoT Core.

It also means that the MQTT integration needs to support various flavors of MQTT authentication. GCP Cloud IoT Core uses a certificate for generating tokens, that must be periodically renewed. The ChirpStack Gateway Bridge has support for connecting to “regular” MQTT brokers, GCP Cloud IoT Core and the Azure IoT Hub but you must pre-configure the authentication scheme (see chirpstack-gateway-bridge/internal/integration/mqtt/auth at master · chirpstack/chirpstack-gateway-bridge · GitHub).


Continuing with the GCP Cloud IoT Core as an example, I believe the setup you expect to work would be:

[MQTT integration] --> [Cloud IoT Core] --> Cloud Pub/Sub

What about the following alternative?

[HTTP integration] --> [Cloud Function] --> Cloud Pub/Sub

I believe the same could be implemented for other cloud providers.

The advantages I see are:

  • Less complexity
  • Less potential overhead (connection “cache” / MQTT connects & disconnects)
  • No need mirror the ChirpStack inventory with (e.g.) GCP Cloud IoT Core

The trick is the writing/configuration of the ‘cloud function’ more than anything else - it would just be really nice to avoid that complexity. Node-Red does this quite well by being the middle-man between MQTT and other protocols - it’d just be really nice if Chirpstack could do this by itself.

In the Google case you’re quite correct - it’s more efficient to go direct to Pub/Sub than go via the IoT Core. And yes, authentication methods differ with AWS asking for certs and Google asking for JWT tokens. But again - a good pluggable system means you don’t need to care about any of that.

With AWS I believe the use of the device shadow service is optional, because they actually use a MQTT-like system in their backend. IoT Core does prefer you to use their device-based system though, which means an annoying level of duplication, which is why the direct-to-pubsub idea is better.

Why would HTTP integration be more efficient than the plugin system I suggested though? A HTTP-based method requires sending multiple packets for a TCP connection/handshake & possibly SSL/TLS negotiation, transmission of the data, then closing the connection, all triggered by your server software.

The plugin-based system utilises the existing MQTT broker for communication, which means that almost everything is removed from your application. The service could communicate directly to the target (ie. directly to Pub/Sub). This means fewer packets transmitted to the network, fewer connections/disconnections, less overall CPU time consumed (although there’d probably be no difference in CPU on the Chirpstack instance itself) and significantly easier configuration for the end-user.

Please note that I’m not saying the existing system is wrong or buggy - the software is great. This is all just wish-list stuff, trying to make a system that works for users with little experience - my main job involves me making software that mechanical and electrical engineers can use rather than IT professionals.

While I do like your plugin proposal, please note that adding a new integration requires:

  • Updating the chirpstack-api package
  • Implement API endpoints
  • Update web-interface

While integrations could be made generic (e.g. integration type and a blob for the configuration, which is how an integration is already stored in the db), making these things pluggable in the web-interface is a bit more tricky. It is a path I would rather not go. It would also require to configure each integration individually as they would be running as separate services. It is not unthinkable that some integrations are storing their own state and would need their own database etc… I don’t think this would make it easier to get started for a lot of users.

Actually, I really appreciate your feedback! While we might not seem to agree on this topic, it is great input and food for thought :slight_smile:


Slightly off-topic: in the next ChirpStack Application Server test-release I will update the HTTP integration to a single endpoint (with backwards compatibility for already configured per-event endpoints). With that, you could use the following GCP Cloud Function to directly publish to Pub/Sub from a HTTP integration:

go.mod:

module example.com/gcp-http-integration

go 1.13

require cloud.google.com/go/pubsub v1.4.0

function.go

package chirpstack

import (
	"context"
	"io/ioutil"
	"log"
	"net/http"

	"cloud.google.com/go/pubsub"
)

// Configuration
const (
	projectID = "your-project-id"
	topicName = "your-pub-sub-topic-name"
)

// State
var (
	client *pubsub.Client
	topic  *pubsub.Topic
)

func init() {
	var err error

	log.Println("configure client")
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		panic(err)
	}

	log.Println("configure topic")
	topic = client.Topic(topicName)

	log.Println("check if topic exists")
	ok, err := topic.Exists(context.Background())
	if err != nil {
		panic(err)
	}
	if !ok {
		panic("topic does not exist")
	}
}

func Request(w http.ResponseWriter, r *http.Request) {
	b, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("read request body: %s", err)
		return
	}

	eventType := r.URL.Query().Get("eventType")
	if eventType == "" {
		log.Printf("empty eventType received")
		return
	}

	res := topic.Publish(r.Context(), &pubsub.Message{
		Data: b,
		Attributes: map[string]string{
			"eventType": eventType,
		},
	})
	if _, err := res.Get(r.Context()); err != nil {
		log.Printf("get pub/sub respone error: %s", err)
		return
	}
}
1 Like

Slightly offtopic as this is not related to MQTT, but this will be included in the next ChirpStack Application Server (test) release:

This should make it a lot easier to integration with cloud providers (AWS, Azure and GCP). These integrations will all be using HTTP(S) calls so the connection-pool/-cache does not apply.

3 Likes

Ahh, wonderful - that will take care of most of my applications then.

The only remaining one is PowerBI, but they’ve got several issues with their JSON endpoint that will probably need to be dealt with before you could build a proper integration anyway (I’ve got two outstanding bug reports about their Streaming Data API that I’m hoping they’ll get around to addressing).

Hii @brocaar

Can we create integration of our custom Geolocation Service Loccarto (https://loccarto.io) and make pull request to chirpstack-application-server repository on GitHub

Thank You