2017-04-17 10:33:09 +05:30

222 lines
5.9 KiB
Go

// Copyright (C) 2016 Nippon Telegraph and Telephone Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/influxdata/influxdb/client/v2"
"github.com/osrg/gobgp/packet/bgp"
"github.com/osrg/gobgp/table"
"time"
)
type Collector struct {
s *BgpServer
url string
dbName string
interval uint64
client client.Client
}
const (
MEATUREMENT_UPDATE = "update"
MEATUREMENT_PEER = "peer"
MEATUREMENT_TABLE = "table"
)
func (c *Collector) writePoints(points []*client.Point) error {
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: c.dbName,
Precision: "ms",
})
bp.AddPoints(points)
return c.client.Write(bp)
}
func (c *Collector) writePeer(msg *WatchEventPeerState) error {
var state string
switch msg.State {
case bgp.BGP_FSM_ESTABLISHED:
state = "Established"
case bgp.BGP_FSM_IDLE:
state = "Idle"
default:
return fmt.Errorf("unexpected fsm state %v", msg.State)
}
tags := map[string]string{
"PeerAddress": msg.PeerAddress.String(),
"PeerAS": fmt.Sprintf("%v", msg.PeerAS),
"State": state,
}
fields := map[string]interface{}{
"PeerID": msg.PeerID.String(),
}
pt, err := client.NewPoint(MEATUREMENT_PEER, tags, fields, msg.Timestamp)
if err != nil {
return err
}
return c.writePoints([]*client.Point{pt})
}
func path2data(path *table.Path) (map[string]interface{}, map[string]string) {
fields := map[string]interface{}{
"RouterID": path.GetSource().ID,
}
if asPath := path.GetAsPath(); asPath != nil {
fields["ASPath"] = asPath.String()
}
if origin, err := path.GetOrigin(); err == nil {
typ := "-"
switch origin {
case bgp.BGP_ORIGIN_ATTR_TYPE_IGP:
typ = "i"
case bgp.BGP_ORIGIN_ATTR_TYPE_EGP:
typ = "e"
case bgp.BGP_ORIGIN_ATTR_TYPE_INCOMPLETE:
typ = "?"
}
fields["Origin"] = typ
}
if med, err := path.GetMed(); err == nil {
fields["Med"] = med
}
tags := map[string]string{
"PeerAddress": path.GetSource().Address.String(),
"PeerAS": fmt.Sprintf("%v", path.GetSource().AS),
"Timestamp": path.GetTimestamp().String(),
}
if nexthop := path.GetNexthop(); len(nexthop) > 0 {
fields["NextHop"] = nexthop.String()
}
if originAS := path.GetSourceAs(); originAS != 0 {
fields["OriginAS"] = fmt.Sprintf("%v", originAS)
}
if err := bgp.FlatUpdate(tags, path.GetNlri().Flat()); err != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("NLRI FlatUpdate failed")
}
for _, p := range path.GetPathAttrs() {
if err := bgp.FlatUpdate(tags, p.Flat()); err != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("PathAttr FlatUpdate failed")
}
}
return fields, tags
}
func (c *Collector) writeUpdate(msg *WatchEventUpdate) error {
if len(msg.PathList) == 0 {
// EOR
return nil
}
now := time.Now()
points := make([]*client.Point, 0, len(msg.PathList))
for _, path := range msg.PathList {
fields, tags := path2data(path)
tags["Withdraw"] = fmt.Sprintf("%v", path.IsWithdraw)
pt, err := client.NewPoint(MEATUREMENT_UPDATE, tags, fields, now)
if err != nil {
return fmt.Errorf("failed to write update, %v", err)
}
points = append(points, pt)
}
return c.writePoints(points)
}
func (c *Collector) writeTable(msg *WatchEventAdjIn) error {
now := time.Now()
points := make([]*client.Point, 0, len(msg.PathList))
for _, path := range msg.PathList {
fields, tags := path2data(path)
pt, err := client.NewPoint(MEATUREMENT_TABLE, tags, fields, now)
if err != nil {
return fmt.Errorf("failed to write table, %v", err)
}
points = append(points, pt)
}
return c.writePoints(points)
}
func (c *Collector) loop() {
w := c.s.Watch(WatchPeerState(true), WatchUpdate(false))
defer w.Stop()
ticker := func() *time.Ticker {
if c.interval == 0 {
return &time.Ticker{}
}
return time.NewTicker(time.Second * time.Duration(c.interval))
}()
for {
select {
case <-ticker.C:
w.Generate(WATCH_EVENT_TYPE_PRE_UPDATE)
case ev := <-w.Event():
switch msg := ev.(type) {
case *WatchEventUpdate:
if err := c.writeUpdate(msg); err != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write update event message")
}
case *WatchEventPeerState:
if err := c.writePeer(msg); err != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write state changed event message")
}
case *WatchEventAdjIn:
if err := c.writeTable(msg); err != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to write Adj-In event message")
}
}
}
}
}
func NewCollector(s *BgpServer, url, dbName string, interval uint64) (*Collector, error) {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: url,
})
if err != nil {
return nil, err
}
_, _, err = c.Ping(0)
if err != nil {
log.Error("can not connect to InfluxDB")
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Error("Failed to connect to InfluxDB")
return nil, err
}
q := client.NewQuery("CREATE DATABASE "+dbName, "", "")
if response, err := c.Query(q); err != nil || response.Error() != nil {
log.WithFields(log.Fields{"Type": "collector", "Error": err}).Errorf("Failed to create database:%s", dbName)
return nil, err
}
collector := &Collector{
s: s,
url: url,
dbName: dbName,
interval: interval,
client: c,
}
go collector.loop()
return collector, nil
}