Remote Monitor Platform Development

2016/06/29

Tags: Go

运行环境

程序运行在阿里云云主机上,跑Gentoo Linux系统。服务器架构如论文和PPT描述。 服务器开放UDP端口5683与设备通信,目前采用自定义协议,以后可以尝试CoAP协议,但目前的GPRS模块通信质量比较差,使用CoAP协议需要考虑。 80端口开放http服务,用nginx反向代理,url结构如nginx配置文件所示。

server {
    listen 80;
    #listen 127.0.0.1;
    server_name localhost;

    access_log /var/log/nginx/localhost.access_log main;
    error_log /var/log/nginx/localhost.error_log info;

    root /var/www/localhost/htdocs;

    location /api {
        proxy_pass http://localhost:8080;
    }

    location /api/events {
        proxy_pass http://localhost:9952;
        proxy_set_header Connection '';
        proxy_http_version 1.1;
        chunked_transfer_encoding off;
        proxy_buffering off;
        proxy_cache off;
    }

    location /node {
        proxy_pass http://localhost:10242;
    }

    location /api/status {
        proxy_pass http://localhost:10242;
    }
}

sse-server.go编写了Sever-Sent Event的实现,通过/tmp/gasdata.sock从udp-server.go获取气体数据,nginx将/api/events路由给sse-server.go。

udp-server.go负责设备通信,它和其他服务器通过Unix Socket交换数据。

templateServer.go负责生成HTML模板。

上述服务器都通过go语言实现

sse-server

go语言的sse实现,很大程度参考了网上的例子Writing a Server Sent Events server in Go

Broker维护了客户端列表,数据都通过chan广播给客户端。

// This happens in a goroutine somewhere else.
broker.Notifier <- eventBytes

Device结构定义了发送给浏览器的json数据格式,Data []float64是气体浓度数组。

package main

import (
    "time"
    "net/http"
    "net"

    "github.com/justinas/alice"
    "os"
    "fmt"

    "github.com/op/go-logging"
    "encoding/binary"
    "encoding/json"
    "math"
)

var log = logging.MustGetLogger("sselog")
var logFormat = logging.MustStringFormatter(
    `%{color}%{time:15:04:05.000} %{shortfunc} | %{level:.4s} %{id:03x}%{color:reset} %{message}`,
)

var bchan = make(chan Device, 20)

type Device struct {
    Id int `json:"id"`
    Data []float64 `json:"data"`
}

// A MessageChan is a channel of channels
// Each connection sends a channel of bytes to a global MessageChan
// The main broker listen() loop listens on new connections on MessageChan
// New event messages are broadcast to all registered connection channels
type MessageChan chan []byte

// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {

    // Events are pushed to this channel by the main events-gathering routine
    Notifier chan []byte

    // New client connections
    newClients chan chan []byte

    // Closed client connections
    closingClients chan chan []byte

    // Client connections registry
    clients map[chan []byte]bool
}

var broker = NewServer()

// Broker factory
func NewServer() (broker *Broker) {
  // Instantiate a broker
  broker = &Broker{
    Notifier:       make(chan []byte, 1),
    newClients:     make(chan chan []byte),
    closingClients: make(chan chan []byte),
    clients:    make(map[chan []byte]bool),
  }

  // Set it running - listening and broadcasting events
  go broker.listen()

  return
}

// Listen on different channels and act accordingly
func (broker *Broker) listen() {
    for {
        select {
        case s := <-broker.newClients:

            // A new client has connected.
            // Register their message channel
            broker.clients[s] = true
            log.Infof("Client added. %d registered clients", len(broker.clients))
        case s := <-broker.closingClients:

            // A client has dettached and we want to
            // stop sending them messages.
            delete(broker.clients, s)
            log.Infof("Removed client. %d registered clients", len(broker.clients))
        case event := <-broker.Notifier:

            // We got a new event from the outside!
            // Send event to all connected clients
            for clientMessageChan, _ := range broker.clients {
                clientMessageChan <- event
            }
        }
    }

}

func ByteToFloat64(b []byte) float64 {
    bits := binary.BigEndian.Uint64(b)
    return math.Float64frombits(bits)
}

func loggingHandler(next http.Handler) http.Handler {
    fn := func(w http.ResponseWriter, r *http.Request) {
        t1 := time.Now()
        next.ServeHTTP(w, r)
        t2 := time.Now()
        log.Infof("[%s] %q %v\n", r.Method, r.URL.String(), t2.Sub(t1))
    }

    return http.HandlerFunc(fn)
}

func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(MessageChan)

    // Signal the broker that we have a new connection
    broker.newClients <- messageChan

    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
        broker.closingClients <- messageChan
    }()

    // Listen to connection close and un-register messageChan
    notify := w.(http.CloseNotifier).CloseNotify()

    go func() {
        <-notify
        broker.closingClients <- messageChan
    }()

    for {
        fmt.Fprintf(w, "data: %s\n\n", <-messageChan)
        flusher.Flush()
    }
}

func socketHandler() {
    var socket = "/tmp/gasdata.sock"
    os.Remove(socket)
    conn, err := net.ListenPacket("unixgram", socket)
    if err != nil {
        log.Fatalf("unix socket listen failed: %s\n", err.Error())
    }
    defer conn.Close()
    log.Infof("conn address: %s\n", conn.LocalAddr())

    var buff = make([]byte, 512)
    for {
        _, _, err := conn.ReadFrom(buff)
        if err != nil {
            log.Warningf("unix socket read failed: %s\n", err.Error())
            continue
        }
        var dev Device
        dev.Id = int(buff[0])
        dev.Data = make([]float64, 7)
        for i:=0; i<7; i++ {
            dev.Data[i] = ByteToFloat64(buff[1+4+8*i:])
        }
        //log.Info("new data: %+v\n", dev)
        jstr, err := json.Marshal(dev)
        if err != nil {
            log.Warningf("json marshal failed: %s\n", err.Error())
            continue
        }
        broker.Notifier <- jstr
    }
}

func main() {
    logging.SetFormatter(logFormat)

    go socketHandler()

    commonHandlers := alice.New(loggingHandler)
    http.Handle("/api/events/data.get", commonHandlers.Then(broker))
    http.ListenAndServe("localhost:9952", nil)
}

udp-server

handleClient获取新的数据包,根据帧头分包,交给payloadHandler处理。payloadHandler根据devid维护设备的IP地址,sessionConn[devid]表示了与devid对应的对话。sessionHandler负责发送数据给设备,一旦设备发送的心跳包超时,则将设备的状态(client[id])更新为false。

package main

import (
    "net"
    "bytes"
    "encoding/json"
    "encoding/binary"
    "math"
    "github.com/op/go-logging"
    "os"
    "time"
)

type Dev struct {
    Id int `json:"id"`
    Cmd string `json:"cmd"`
}

type Status struct {
    Id int `json:"id"`
    Status int `json:"status"`
    Type int `json:"type"`
}

var log = logging.MustGetLogger("gaslog")
var logFormat = logging.MustStringFormatter(
    `%{color}%{time:15:04:05.000} %{shortfunc} | %{level:.4s} %{id:03x}%{color:reset} %{message}`,
)
var socket net.Conn
var client = make(map[int]bool)
var clientAddr = make([]net.Addr, 512)
var sessionConn = make([]*net.UDPConn, 512)
var sessionChan [512]chan []byte

func ByteToFloat64(b []byte) float64 {
    bits := binary.BigEndian.Uint64(b)
    return math.Float64frombits(bits)
}

func payloadHandler(conn *net.UDPConn, addr *net.UDPAddr, buff []byte) {
    //log.Infof("new payload: %+v\n", buff)

    msgtype, devid := buff[0], int(buff[1])
    if client[devid] == false {
        client[devid] =true
        clientAddr[devid] = addr
        sessionConn[devid] = conn
        go sessionHandler(devid)
    } else if clientAddr[devid] != addr {
        clientAddr[devid] = addr
        sessionConn[devid] = conn
    }

    switch msgtype {
    case 0x0: // heartbeat
        var payload = make([]byte, 2)
        payload[0] = 0
        payload[1] = 1
        sessionChan[devid] <- payload
        //conn.WriteTo([]byte("OK"), addr)
    case 0x1: // gas data
        //log.Noticef("buff: %+v\n", buff[2:60])
        socket.Write(buff[1:])
        gasdata := make([]float64, 7)
        for i:=0; i<7; i++ {
            gasdata[i] = ByteToFloat64(buff[2+4+8*i:])
        }
        log.Infof("%d: %f %f %f %f %f %f %f\n", devid, gasdata[0], gasdata[1], gasdata[2], gasdata[3], gasdata[4], gasdata[5], gasdata[6])
        //conn.WriteToUDP([]byte("OK"), addr)
    case 0x2:
        status := buff[2]
        log.Infof("%d: status changed: %d\n", devid, status)
    }
}

func handleClient(conn *net.UDPConn) {
    var buff = make([]byte, 512)
    n, addr, err := conn.ReadFromUDP(buff[0:])
    if err != nil {
        return
    }

    payload := make([]byte, n)
    copy(payload, buff)

    //log.Infof("payload.length=%d\n", n)
    for i := bytes.Index(payload[0:], []byte("hjhee")); i != -1; i = bytes.Index(payload[0:], []byte("hjhee")) {
        //log.Infof("payload: %+v\n", payload[i+5:])
        go payloadHandler(conn, addr, payload[i+5:])
        payload = payload[i+5:]
    }
}

func sessionHandler(id int) {
    for {
        select {
        case b := <-sessionChan[id]:
            switch b[0] {
            case 0x0: // heartbeat
                log.Infof("new heartbeat[id=%d]: %s\n", id, clientAddr[id])
                conn := sessionConn[id]
                conn.WriteTo([]byte("OK"), clientAddr[id])
            case 0x1: // device status update
                conn := sessionConn[id]
                conn.WriteTo(b, clientAddr[id])
                log.Infof("status update[addr=%s]: %d\n", clientAddr[id], b[1])
            case 0x3: // status query
                conn := sessionConn[id]
                conn.WriteTo(b, clientAddr[id])
            default:
                log.Infof("invalid data[id=%d]\n", id)
            }
        case <-time.After(30 * time.Second):
            log.Noticef("heartbeat timeout: id[%d]\n", id)
            client[id] = false
            return
        }
    }
}

func devHandler() {
    var devSocket = "/tmp/devstatus.sock"
    os.Remove(devSocket)
    conn, err := net.ListenPacket("unixgram", devSocket)
    if err != nil {
        log.Fatalf("devSocket create failed: %s\n", err.Error())
    }
    defer conn.Close()
    log.Infof("devSocket local addr: %s\n", conn.LocalAddr())

    var buff = make([]byte, 512)
    for {
        n, _, err := conn.ReadFrom(buff)
        if err != nil {
            log.Warningf("devSocket read failed: %s\n", err.Error())
            continue
        }
        reader := bytes.NewReader(buff)
        decoder := json.NewDecoder(reader)
        var devCmd Dev
        err = decoder.Decode(&devCmd)
        if err != nil {
            log.Warningf("json decode failed: %s(error: %s)\n", string(buff[0:n]), err.Error())
            continue
        }
        log.Infof("new command: %+v\n", devCmd)
        switch devCmd.Cmd {
        case "devStart":
            var payload = make([]byte, 2)
            payload[0] = 1
            payload[1] = 0x3
            sessionChan[devCmd.Id] <- payload
        case "devStop":
            var payload = make([]byte, 2)
            payload[0] = 1
            payload[1] = 0x0
            sessionChan[devCmd.Id] <- payload
        case "devQuery":
            var payload = make([]byte, 1)
            payload[0] =2
            if client[devCmd.Id] {
                sessionChan[devCmd.Id] <- payload
            } else {
                status := Status{Id: devCmd.Id, Status: -1, Type: -1}
                statusStr, _ := json.Marshal(status)
                log.Infof("status[id=%d]=%s\n", status.Id, statusStr)
            }
            log.Infof("new Query[id=%d]\n", devCmd.Id)
        }
    }
}

func main() {
    logging.SetFormatter(logFormat)

    for i:=0; i<512; i++ {
        sessionChan[i] = make(chan []byte, 5)
    }

    go devHandler()

    var err error
    socket, err = net.Dial("unixgram", "/tmp/gasdata.sock")
    if err != nil {
        log.Fatal("Unix Domain Socket create failed")
    }
    defer socket.Close()
    udpAddr, _ := net.ResolveUDPAddr("udp4", ":5683")
    conn, _ := net.ListenUDP("udp", udpAddr)
    defer conn.Close()
    for {
        handleClient(conn)
    }
}

templateServer

templateServer负责生成HTML模板,可以预处理加快生成速度。另一方面也提供REST API,/api/status/:id用于给设备发送命令变更工作状态,GET操作查询,POST操作更新。

package main

import (
    "net"
    "net/http"
    "html/template"
    "github.com/julienschmidt/httprouter"
    "github.com/op/go-logging"
    "io/ioutil"
    "strconv"
    "encoding/json"
)

var log = logging.MustGetLogger("sselog")
var logFormat = logging.MustStringFormatter(
    `%{color}%{time:15:04:05.000} %{shortfunc} | %{level:.4s} %{id:03x}%{color:reset} %{message}`,
)

var devSocket net.Conn

type Dev struct {
    Id int `json:"id"`
    Cmd string `json:"cmd"`
}

func newTemplateHandler() (func(w http.ResponseWriter, r *http.Request, ps httprouter.Params), error) {
/*
    tmpl, err := template.ParseFiles("templates/layout.html")
    if err != nil {
        return nil, err
    }
*/
    return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
        id, _ := strconv.Atoi(ps.ByName("id"))
        tmpl, err := template.ParseFiles("templates/layout.html")
        if err != nil {
            log.Errorf("template Parse failed: %s\n", err.Error())
            return
        }
        err = tmpl.Execute(w, Dev{Id: id})
        if err != nil {
            log.Errorf("template execute failed: %s\n", err.Error())
        }
    }, nil
}

func statusHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
    id, _ := strconv.Atoi(ps.ByName("id"))
    if r.Method == "POST" {
        result, _ := ioutil.ReadAll(r.Body)
        r.Body.Close()
        devSocket.Write([]byte(result))
        log.Infof("status.post: %s\n", result)
    }
    if r.Method == "GET" {
        devCmd := Dev{Id: id, Cmd: "devQuery"}
        cmd, _ := json.Marshal(devCmd)
        devSocket.Write([]byte(cmd))
    }
}

func main() {
    logging.SetFormatter(logFormat)

    var err error
    devSocket, err = net.Dial("unixgram", "/tmp/devstatus.sock")
    if err != nil {
        log.Fatalf("socket dial failed: %s\n", err.Error())
    }
    templateHandler, err := newTemplateHandler()
    if err != nil {
        log.Fatalf("template parse failed: %s\n", err.Error())
    }

    router := httprouter.New()
    router.GET("/node/:id", templateHandler)
    router.POST("/api/status/:id", statusHandler)
    router.GET("/api/status/:id", statusHandler)
    log.Fatal(http.ListenAndServe("localhost:10242", router))
}

HTML 模板

<!DOCTYPE html>
<html>
<head>
	<meta charset="utf-8">
	<title>Dev{{.Id}}</title>
	<link rel="stylesheet" href="/css/chartist.min.css">
	<script src="/js/jquery-2.2.4.min.js"></script>
	<script src="/js/chartist.min.js"></script>
	<script src="/js/chartist-plugin-axistitle.js"></script>
	<script>
$(document).ready(function(){
	var data = {
		// A labels array that can contain any sort of values
		labels: [],
		// Our series array that contains series objects or in this case series data arrays
		series: [{
			name: "SO2",
			data: []
		}, {
			name: "NO2",
			data: []
		}, {
			name: "O3",
			data: []
		}, {
			name: "CO",
			data: []
		}, {
			name: "RH",
			data: []
		}, {
			name: "Temp",
			data: []
		}]
	};

	// Create a new line chart object where as first parameter we pass in a selector
	// that is resolving to our chart container element. The Second parameter
	// is the actual data object.
	chart = new Chartist.Line('.ct-chart', data, {
		chartPadding: {
			top: 30,
			right: 0,
			bottom: 30,
			left: 20
		},
		plugins: [
			Chartist.plugins.ctAxisTitle({
				axisX: {
					axisTitle: 'Time (mins)',
					axisClass: 'ct-axis-title',
					offset: {
						x: 0,
						y: 50
					},
					textAnchor: 'middle'
				},
				axisY: {
					axisTitle: 'Concentrations (ppb)',
					axisClass: 'ct-axis-title',
					offset: {
						x: 0,
						y: -1
					},
					flipTitle: false
				}
			})
		]
	});
	
	var source = new EventSource("/api/events/data.get");
	var indx = 0;
	var devid = window.location.pathname.split('/').pop();
	source.onmessage = function(evt) {
		var payload = JSON.parse(evt.data);
		if(payload.id != devid){
			return;
		}
		chart.data.labels.push(indx);
		indx++;
		if(chart.data.labels.length>10)
			chart.data.labels.shift();
		for(var i=0; i<chart.data.series.length; i++){
			var s = chart.data.series[i];
			s.data.push(Math.sin(Math.random()*2*Math.PI));
			if(s.data.length>10){
				s.data.shift();
			}
		}
		chart.update();
		
		var row = '';
		var row = '<tr><th>' + Math.round(payload.data[0]) + '</th>';
		for(var i=1; i<7; i++){
			row += '<th>' + payload.data[i].toPrecision(4) + '</th>';
		}
		$('#tbCon tbody').prepend(row);
		var rows = $('#tbCon tbody tr');
		if(rows.length > 10){
			rows.last().remove();
		}
	}
});
</script>
<script>
$(document).ready(function(){
	var devid = window.location.pathname.split('/').pop();
	
	$('#devCmd').submit(function(evt){
		evt.preventDefault();
		var cmd = $(document.activeElement).attr('name');
		var serize = JSON.stringify({id: parseInt(devid), cmd: cmd});
		$.post("/api/status/" + devid, serize);
		console.log('request sent:' + serize);
	});
	
	var devStatusSource = new EventSource("/api/event/status/" + devid);
	devStatusSource.onmessage = function(evt) {
		var payload = JSON.parse(evt.data);
		
		switch(payload.status) {
		case 0:
			$('#devStatus').text("停止采集");
			break;
		case 1:
			$('#devStatus').text("正在采集");
			break;
		default:
			$('#devStatus').text("无法获取");
			break;
		}
	}
	
	var source = new EventSource("/api/events/data.get");
	source.onmessage = function(evt) {
		var payload = JSON.parse(evt.data);
		console.log("[status]new event:" + payload);
	}
});
</script>
</head>
<body>
<header>
	<div class="hd-title" style="text-align: center;">检测节点{{.Id}}</div>
</header>
<main>
	<section>
		<div class="devStatus" style="text-align: center;">
			<form id="devCmd">
					<label>运行状态:</label>
					<label id="devStatus">无法获取设备信息</label>
					<input class="devBut" type="submit" name="devStart" value="Start" />
					<input class="devBut" type="submit" name="devStop" value="Stop" />
			</form>
		</div>
	</section>
	<section>
		<div class="ct-chart ct-major-eleventh" style="width: 80%; margin-left: auto; margin-right: auto;"></div>
	</section>
	<section>
		<table id="tbCon" style="width: 60%; margin-left: auto; margin-right: auto;">
			<colgroup>
				<col width="10%">
				<col width="15%">
				<col width="15%">
				<col width="15%">
				<col width="15%">
				<col width="15%">
				<col width="15%">
			</colgroup>
			<thead>
				<tr>
					<th>Time</th>
					<th>SO<sub>2</sub></th>
					<th>NO<sub>2</sub></th>
					<th>O<sub>3</sub></th>
					<th>CO</th>
					<th>RH</th>
					<th>Temp</th>
				</tr>
			</thead>
			<tbody>
			</tbody>
		</table>
	</section>
</main>
</body>
</html>