From 9751c42d339050b03e35921f2947783c66ca75d0 Mon Sep 17 00:00:00 2001 From: phoenix Date: Fri, 28 May 2021 14:39:03 +0200 Subject: [PATCH] Add mqtt-influxdb gateway Add a mqtt-influxdb gateway, which collects data point from mqtt and pushed them to a configurable influxdb database. --- .gitignore | 3 + Makefile | 10 +- cmd/influxgateway/influxdb.go | 55 +++++ cmd/influxgateway/meteo-mqtt-influxdb.go | 267 +++++++++++++++++++++++ cmd/influxgateway/mqtt.go | 36 +++ go.mod | 3 + go.sum | 14 ++ meteo-influx-gateway.ini.example | 8 + 8 files changed, 392 insertions(+), 4 deletions(-) create mode 100644 cmd/influxgateway/influxdb.go create mode 100644 cmd/influxgateway/meteo-mqtt-influxdb.go create mode 100644 cmd/influxgateway/mqtt.go create mode 100644 meteo-influx-gateway.ini.example diff --git a/.gitignore b/.gitignore index b63b4a9..c6e02c8 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,9 @@ # Build files build build/ +/meteo +/meteod +/meteo-influx-gateway ## Custom config files *.toml diff --git a/Makefile b/Makefile index 2aa21d1..8ca6c66 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ default: all -all: meteo meteod $(SUBDIRS) +all: meteo meteod influxgateway $(SUBDIRS) install: all install meteo /usr/local/bin/ @@ -24,11 +24,13 @@ req-gateway: ## === Builds =============================================================== ## meteo: cmd/meteo/meteo.go - go build $< + go build $^ meteod: cmd/meteod/meteod.go - go build $< + go build $^ gateway: cmd/gateway/gateway.go - go build $< + go build $^ +influxgateway: cmd/influxgateway/meteo-mqtt-influxdb.go cmd/influxgateway/influxdb.go cmd/influxgateway/mqtt.go + go build -o meteo-influx-gateway $^ ## === Tests ================================================================ ## diff --git a/cmd/influxgateway/influxdb.go b/cmd/influxgateway/influxdb.go new file mode 100644 index 0000000..6fd1959 --- /dev/null +++ b/cmd/influxgateway/influxdb.go @@ -0,0 +1,55 @@ +package main + +import ( + "net/url" + "time" + + client "github.com/influxdata/influxdb1-client" +) + +type InfluxDB struct { + database string + client *client.Client +} + +func ConnectInfluxDB(remote string, username string, password string, database string) (InfluxDB, error) { + var influx InfluxDB + influx.database = database + + host, err := url.Parse(remote) + if err != nil { + return influx, err + } + conf := client.Config{ + URL: *host, + Username: username, + Password: password, + } + influx.client, err = client.NewClient(conf) + return influx, err +} + +// Ping the InfluxDB server +func (influx *InfluxDB) Ping() (time.Duration, string, error) { + return influx.client.Ping() +} + +// Write a measurement into the database +func (influx *InfluxDB) Write(measurement string, tags map[string]string, fields map[string]interface{}) error { + point := client.Point{ + Measurement: measurement, + Tags: tags, + Fields: fields, + //Time: time.Now(), + Precision: "s", + } + pts := make([]client.Point, 0) + pts = append(pts, point) + bps := client.BatchPoints{ + Points: pts, + Database: influx.database, + //RetentionPolicy: "default", + } + _, err := influx.client.Write(bps) + return err +} diff --git a/cmd/influxgateway/meteo-mqtt-influxdb.go b/cmd/influxgateway/meteo-mqtt-influxdb.go new file mode 100644 index 0000000..031e41b --- /dev/null +++ b/cmd/influxgateway/meteo-mqtt-influxdb.go @@ -0,0 +1,267 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "gopkg.in/ini.v1" +) + +// Config is the internal singleton configuration for this program +type Config struct { + MqttHost string `ini:"mqtt,remote"` + InfluxHost string `ini:"influxdb,remote"` + InfluxUsername string `ini:"influxdb,username"` + InfluxPassword string `ini:"influxdb,password"` + InfluxDatabase string `ini:"influxdb,database"` + Verbose bool +} + +func (c *Config) loadIni(filename string) error { + cfg, err := ini.Load(filename) + if err != nil { + return err + } + + mqtt := cfg.Section("mqtt") + mqtthost := mqtt.Key("remote").String() + if mqtthost != "" { + c.MqttHost = mqtthost + } + influx := cfg.Section("influxdb") + influxhost := influx.Key("remote").String() + if influxhost != "" { + c.InfluxHost = influxhost + } + influxuser := influx.Key("username").String() + if influxuser != "" { + c.InfluxUsername = influxuser + } + influxpass := influx.Key("password").String() + if influxpass != "" { + c.InfluxPassword = influxpass + } + influxdb := influx.Key("database").String() + if influxdb != "" { + c.InfluxDatabase = influxdb + } + + return nil +} + +var config Config + +var influx InfluxDB + +func assembleJson(node int, data map[string]interface{}) string { + ret := fmt.Sprintf("node %d: {", node) + first := true + for k, v := range data { + if first { + first = false + } else { + ret += ", " + } + ret += fmt.Sprintf("\"%s\":%f", k, v) + } + ret += "}" + return ret +} + +func received(msg mqtt.Message) { + data := make(map[string]interface{}, 0) + if err := json.Unmarshal(msg.Payload(), &data); err != nil { + fmt.Fprintf(os.Stderr, "json unmarshall error: %s\n", err) + return + } + + // We don't log the name, remove it, if present + if _, ok := data["name"]; ok { + delete(data, "name") + } + // ID is taken from the topic + if _, ok := data["id"]; ok { + delete(data, "id") + } + + // Parse node ID from topic + nodeID, err := strconv.Atoi(msg.Topic()[6:]) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid meteo id\n") + return + } + + // Write to InfluxDB + for k, v := range data { + tags := map[string]string{"node": fmt.Sprintf("%d", nodeID)} + f, err := strconv.ParseFloat(fmt.Sprintf("%f", v), 32) + if err != nil { + fmt.Fprintf(os.Stderr, "non-float value received: %s\n", err) + return + } + fields := map[string]interface{}{"value": f} + if err := influx.Write(k, tags, fields); err != nil { + fmt.Fprintf(os.Stderr, "cannot write to influxdb: %s\n", err) + return + } + } + + // OK + if config.Verbose { + fmt.Println(assembleJson(nodeID, data)) + } +} + +// awaits SIGINT or SIGTERM +func awaitTerminationSignal() { + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + fmt.Println(sig) + done <- true + }() + <-done +} + +func printUsage() { + fmt.Println("meteo-influxdb-gateway") + fmt.Printf("Usage: %s [OPTIONS]\n", os.Args[0]) + fmt.Println("OPTIONS") + fmt.Println(" -h,--help Display this help message") + fmt.Println(" -c,--config FILE Load config file") + fmt.Println(" -v, --verbose Verbose output") + fmt.Println(" --mqtt MQTT Set mqtt server") + fmt.Println(" --influx HOST Set influxdb hostname") + fmt.Println(" --username USER Set influxdb username") + fmt.Println(" --password PASS Set influxdb password") + fmt.Println(" --database DB Set influxdb database") + +} + +func fileExists(filename string) bool { + if _, err := os.Stat(filename); os.IsNotExist(err) { + return false + } + return true +} + +func main() { + var err error + // Default settings + config.MqttHost = "127.0.0.1" + config.InfluxHost = "http://127.0.0.1:8086" + config.InfluxUsername = "meteo" + config.InfluxPassword = "" + config.InfluxDatabase = "meteo" + config.Verbose = false + + configFile := "/etc/meteo/meteo-influx-gateway.ini" + if fileExists(configFile) { + if err := config.loadIni(configFile); err != nil { + fmt.Fprintf(os.Stderr, "error loading ini file %s: %s\n", configFile, err) + os.Exit(1) + } + } + + args := os.Args[1:] + for i := 0; i < len(args); i++ { + arg := strings.TrimSpace(args[i]) + if arg == "" { + continue + } else if arg[0] == '-' { + last := i >= len(args)-1 + if arg == "-h" || arg == "--help" { + printUsage() + return + } else if arg == "-c" || arg == "--config" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: config file\n") + os.Exit(1) + } + i++ + configFile = args[i] + if err := config.loadIni(configFile); err != nil { + fmt.Fprintf(os.Stderr, "error loading ini file %s: %s\n", configFile, err) + os.Exit(1) + } + } else if arg == "-v" || arg == "--verbose" { + config.Verbose = true + } else if arg == "--mqtt" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: mqtt remote\n") + os.Exit(1) + } + i++ + config.MqttHost = args[i] + } else if arg == "--influx" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: influx remote\n") + os.Exit(1) + } + i++ + config.InfluxHost = args[i] + } else if arg == "--username" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: influx username\n") + os.Exit(1) + } + i++ + config.InfluxUsername = args[i] + } else if arg == "--password" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: influx password\n") + os.Exit(1) + } + i++ + config.InfluxPassword = args[i] + } else if arg == "--database" { + if last { + fmt.Fprintf(os.Stderr, "Missing argument: influx database\n") + os.Exit(1) + } + i++ + config.InfluxDatabase = args[i] + } + } else { + fmt.Fprintf(os.Stderr, "Invalid argument: %s\n", arg) + os.Exit(1) + } + } + + // Connect InfluxDB + influx, err = ConnectInfluxDB(config.InfluxHost, config.InfluxUsername, config.InfluxPassword, config.InfluxDatabase) + if err != nil { + fmt.Fprintf(os.Stderr, "cannot connect to influxdb: %s\n", err) + os.Exit(1) + } + if ping, version, err := influx.Ping(); err != nil { + fmt.Fprintf(os.Stderr, "cannot ping influxdb: %s\n", err) + os.Exit(1) + } else { + if config.Verbose { + fmt.Printf("influxdb connected: v%s - Ping: %d ms\n", version, ping.Milliseconds()) + } + } + + // Connect to mqtt server + if mqtt, err := ConnectMQTT(config.MqttHost, 1883); err != nil { + fmt.Fprintf(os.Stderr, "mqtt error: %s\n", err) + os.Exit(1) + } else { + mqtt.Subscribe("meteo/#", received) + if config.Verbose { + fmt.Println("mqtt connected: " + config.MqttHost) + } + } + fmt.Println("meteo-mqtt-influx gateway is up and running") + + awaitTerminationSignal() +} diff --git a/cmd/influxgateway/mqtt.go b/cmd/influxgateway/mqtt.go new file mode 100644 index 0000000..c91ba19 --- /dev/null +++ b/cmd/influxgateway/mqtt.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type MqttReceive func(msg mqtt.Message) + +type MQTT struct { + client mqtt.Client +} + +func ConnectMQTT(remote string, port int) (MQTT, error) { + var ret MQTT + opts := mqtt.NewClientOptions() + remote = fmt.Sprintf("tcp://%s:%d", remote, port) + opts.AddBroker(remote) + opts.AutoReconnect = true + + ret.client = mqtt.NewClient(opts) + token := ret.client.Connect() + for !token.WaitTimeout(5 * time.Second) { + } + return ret, token.Error() +} + +func (mq *MQTT) Subscribe(topic string, callback MqttReceive) error { + token := mq.client.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) { + callback(msg) + }) + token.Wait() + return token.Error() +} diff --git a/go.mod b/go.mod index 28a84bf..79dd8bb 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,10 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/gorilla/mux v1.7.4 + github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 github.com/mattn/go-sqlite3 v2.0.3+incompatible + github.com/smartystreets/goconvey v1.6.4 // indirect golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect + gopkg.in/ini.v1 v1.62.0 ) diff --git a/go.sum b/go.sum index 25dd94d..c85df24 100644 --- a/go.sum +++ b/go.sum @@ -2,15 +2,29 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig= +github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4 h1:G2ztCwXov8mRvP0ZfjE6nAlaCX2XbykaeHdbT6KwDz0= github.com/jacobsa/go-serial v0.0.0-20180131005756-15cf729a72d4/go.mod h1:2RvX5ZjVtsznNZPEt4xwJXNJrM3VTZoQf7V6gk0ysvs= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= +gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/meteo-influx-gateway.ini.example b/meteo-influx-gateway.ini.example new file mode 100644 index 0000000..5f0a9da --- /dev/null +++ b/meteo-influx-gateway.ini.example @@ -0,0 +1,8 @@ +[mqtt] +remote = "127.0.0.1" + +[influxdb] +remote = "http://127.0.0.1:8086" +username = "meteo" +password = "meteo" +database = "meteo"