What are the options available for downlink commands sent to edge devices?
The chirpstack-application-server has a a GUI within the dashboard for queueing downlink data. But I would need something more automated. My ideal integration would be an influxDB database that acts a “queue” for a particular device. Upon reception of an uplink, chirpstack queries a bucket and sends the frame it finds, if any. This would be proceed as class A polling.
Another integration would be MQTT. Ideally, i send the downlink frame at any time, then chirpstack maintains an internal queue of these awaiting a device’s polling window.
Are these integrations available, or something similar? thank you.
I think for these use-cases, it would be best to implement an application or script which performs these actions for you. E.g. you could use Node-RED to build a flow which on uplink enqueues a downlink (if needed) through the ChirpStack API.
I chose the python option with grpc. The following code should go as a guide. Important notes are listed in the method version_reminder()
Second note : many gateways flip (alternate) IQ inversion between uplink and downlink. Your edge device must account for this. If not, your antenna will never see a downlink frame.
print('import grpc')
try :
import grpc
except Exception :
raise ImportError('failed import grpc')
else :
print(' grpc okay')
print('import chirpstack_api')
try :
from chirpstack_api.as_pb.external import api
except Exception as e:
raise ImportError(e)
else :
print(' OK: chirpstack_api ' )
import os
import sys
import datetime
import time
import traceback
from typing import List,Dict
import binascii
class DeviceParams :
def __init__(self) :
# Decorate for all end devices.
self._dev_eui = { "nameX":"9d76b6000011b387","nameY":"9d76b6000011b389" }
# Failing all other checks, the fport will also be different for each device.
self._f_port = { "nameY":1, "nameX":2 }
# Local IP of a Chirpstack Application Server
self._server = "127.0.0.1:8080"
# The API token from Org. API Keys
self._api_token ="redacted"
@property
def server(self):
return self._server
@property
def dev_eui(self):
return self._dev_eui
@property
def f_port(self):
return self._f_port
@property
def CLIcode(self):
return self._CLIcode
@property
def api_token(self):
return self._api_token
class loraCommand( ) :
def __init__(self, devParams ) :
self.logfile = 'control_err.log'
self.silentVR = False
self.server = devParams.server
self.dev_eui = devParams.dev_eui
self.f_port = devParams.f_port
self.CLIcode = devParams.CLIcode
self.api_token = devParams.api_token
# Define the API key meta-data.
self.auth_token = [("authorization", "Bearer %s" % self.api_token)]
self.lastdata = bytes(1)
'''
diagnostic() will send 6 zero bytes to the dev0 device.
'''
def diagnostic(self)->int :
alldev = [k for k in self.dev_eui.keys()]
dev0 = alldev[0]
errd=0
print("open grpc channel . . .")
try:
# Connect without using TLS.
channel = grpc.insecure_channel(self.server)
except Exception:
print( "failed grpc.insecure_channel()")
errd=errd-1
else:
print(' OK grpc channel')
print("open device service client. . .")
try:
# Device-queue API client.
client = api.DeviceQueueServiceStub(channel)
except Exception:
print('FAILED dev service client')
errd=errd-1
else:
print(' OK dev service client')
print("construct an API request. . .")
try:
# Construct request.
req = api.EnqueueDeviceQueueItemRequest()
except Exception:
errd=errd-1
print('FAILED API request.')
else:
print(' OK API request')
print("devi queue item reference . . . ")
try:
dqi = req.device_queue_item
except Exception as e :
errd=errd-1
print("failed devi queue item ref.")
else :
print(' OK devi queue item')
dqi.confirmed = False
dqi.data = bytes([(0x00) for b in range(6)])
dqi.dev_eui = self.dev_eui[dev0]
dqi.f_port = self.f_port[dev0]
print("enqueue downlink frame to ", dev0 )
try:
resp = client.Enqueue(req, metadata=self.auth_token)
except Exception:
errd=errd-1
print("failed enqueue downlink.")
else:
print(' OK enqueue downlink')
if errd <0 :
version_reminder(self.silentVR)
return(errd)
@property
def data(self) :
return self.lastdata
def version_reminder( self, silent:bool )->None :
if not silent :
print("Connect errors are caused by version mismatches between python and Chirpstack.")
print("Packages. exact versions required :")
print("libprotobuf 3.20.1 h4ff587b_0 anaconda")
print("protobuf 3.20.1 py37h295c915_0 anaconda")
print("grpcio 1.42.0 py37hce63b2e_0 anaconda")
print("chirpstack-api 3.12.4 pip")
print("Servers. exact versions required :" )
print("chirpstack-network-server version 3.16.6")
print("chirpstack-application-server version 3.17.9")