Merge pull request #22 from jsimonetti/upstream_concurrency

Use upstreams goroutine safe execution
This commit is contained in:
Jeroen Simonetti 2019-04-10 16:59:49 +02:00 committed by GitHub
commit c6cbc0b2e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 49 deletions

View File

@ -110,8 +110,8 @@ const (
// New creates a new address using the AddressMessage information.
func (a *AddressService) New(req *AddressMessage) error {
flags := netlink.Request
_, err := a.c.Send(req, RTM_NEWADDR, flags)
flags := netlink.Request | netlink.Create | netlink.Acknowledge | netlink.Excl
_, err := a.c.Execute(req, RTM_NEWADDR, flags)
if err != nil {
return err
}
@ -128,8 +128,8 @@ func (a *AddressService) Delete(address net.IP, index uint32) error {
},
}
flags := netlink.Request
_, err := a.c.Send(req, RTM_DELADDR, flags)
flags := netlink.Request | netlink.Acknowledge
_, err := a.c.Execute(req, RTM_DELADDR, flags)
if err != nil {
return err
}

107
conn.go
View File

@ -27,6 +27,7 @@ type conn interface {
Close() error
Send(m netlink.Message) (netlink.Message, error)
Receive() ([]netlink.Message, error)
Execute(m netlink.Message) ([]netlink.Message, error)
}
// Dial dials a route netlink connection. Config specifies optional
@ -38,11 +39,15 @@ func Dial(config *netlink.Config) (*Conn, error) {
return nil, err
}
return newConn(c), nil
return NewConn(c), nil
}
// newConn is the internal constructor for Conn, used in tests.
func newConn(c conn) *Conn {
// NewConn creates a Conn that wraps an existing *netlink.Conn for
// generic netlink communications.
//
// NewConn is primarily useful for tests. Most applications should use
// Dial instead.
func NewConn(c conn) *Conn {
rtc := &Conn{
c: c,
}
@ -92,11 +97,66 @@ func (c *Conn) Receive() ([]Message, []netlink.Message, error) {
return nil, nil, err
}
return messageUnmarshall(msgs)
rtmsgs, err := unpackMessages(msgs)
if err != nil {
return nil, nil, err
}
return rtmsgs, msgs, nil
}
// messageUnmarshall will unmarshal the message based on its type
func messageUnmarshall(msgs []netlink.Message) ([]Message, []netlink.Message, error) {
// Execute sends a single Message to netlink using Send, receives one or more
// replies using Receive, and then checks the validity of the replies against
// the request using netlink.Validate.
//
// Execute acquires a lock for the duration of the function call which blocks
// concurrent calls to Send and Receive, in order to ensure consistency between
// generic netlink request/reply messages.
//
// See the documentation of Send, Receive, and netlink.Validate for details
// about each function.
func (c *Conn) Execute(m Message, family uint16, flags netlink.HeaderFlags) ([]Message, error) {
nm, err := packMessage(m, family, flags)
if err != nil {
return nil, err
}
msgs, err := c.c.Execute(nm)
if err != nil {
return nil, err
}
return unpackMessages(msgs)
}
//Message is the interface used for passing around different kinds of rtnetlink messages
type Message interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
rtMessage()
}
// packMessage packs a rtnetlink Message into a netlink.Message with the
// appropriate rtnetlink family and netlink flags.
func packMessage(m Message, family uint16, flags netlink.HeaderFlags) (netlink.Message, error) {
nm := netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(family),
Flags: flags,
},
}
mb, err := m.MarshalBinary()
if err != nil {
return netlink.Message{}, err
}
nm.Data = mb
return nm, nil
}
// unpackMessages unpacks rtnetlink Messages from a slice of netlink.Messages.
func unpackMessages(msgs []netlink.Message) ([]Message, error) {
lmsgs := make([]Message, 0, len(msgs))
for _, nm := range msgs {
@ -125,41 +185,10 @@ func messageUnmarshall(msgs []netlink.Message) ([]Message, []netlink.Message, er
}
if err := (m).UnmarshalBinary(nm.Data); err != nil {
return nil, nil, err
return nil, err
}
lmsgs = append(lmsgs, m)
}
return lmsgs, msgs, nil
}
// Execute sends a single Message to netlink using Conn.Send, receives one or
// more replies using Conn.Receive, and then checks the validity of the replies
// against the request using netlink.Validate.
//
// See the documentation of Conn.Send, Conn.Receive, and netlink.Validate for
// details about each function.
func (c *Conn) Execute(m Message, family uint16, flags netlink.HeaderFlags) ([]Message, error) {
req, err := c.Send(m, family, flags)
if err != nil {
return nil, err
}
msgs, replies, err := c.Receive()
if err != nil {
return nil, err
}
if err := netlink.Validate(req, replies); err != nil {
return nil, err
}
return msgs, nil
}
//Message is the interface used for passing around different kinds of rtnetlink messages
type Message interface {
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
rtMessage()
return lmsgs, nil
}

View File

@ -171,7 +171,7 @@ func TestConnReceive(t *testing.T) {
func testConn(t *testing.T) (*Conn, *testNetlinkConn) {
c := &testNetlinkConn{}
return newConn(c), c
return NewConn(c), c
}
type testNetlinkConn struct {
@ -190,11 +190,17 @@ func (c *testNetlinkConn) Receive() ([]netlink.Message, error) {
return c.receive, nil
}
func (c *testNetlinkConn) Execute(m netlink.Message) ([]netlink.Message, error) {
c.send = m
return c.receive, nil
}
type noopConn struct{}
func (c *noopConn) Close() error { return nil }
func (c *noopConn) Send(m netlink.Message) (netlink.Message, error) { return netlink.Message{}, nil }
func (c *noopConn) Receive() ([]netlink.Message, error) { return nil, nil }
func (c *noopConn) Close() error { return nil }
func (c *noopConn) Send(_ netlink.Message) (netlink.Message, error) { return netlink.Message{}, nil }
func (c *noopConn) Receive() ([]netlink.Message, error) { return nil, nil }
func (c *noopConn) Execute(m netlink.Message) ([]netlink.Message, error) { return nil, nil }
func mustMarshal(m encoding.BinaryMarshaler) []byte {
b, err := m.MarshalBinary()

View File

@ -129,8 +129,8 @@ func (r *RouteService) Add(req *RouteMessage) error {
// Delete existing route
func (r *RouteService) Delete(req *RouteMessage) error {
flags := netlink.Request
_, err := r.c.Send(req, RTM_DELROUTE, flags)
flags := netlink.Request | netlink.Acknowledge
_, err := r.c.Execute(req, RTM_DELROUTE, flags)
if err != nil {
return err
}