diff --git a/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go b/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go new file mode 100644 index 0000000000..445f305c82 --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/cluster_test.go @@ -0,0 +1,294 @@ +package zk + +import ( + "sync" + "testing" + "time" +) + +type logWriter struct { + t *testing.T + p string +} + +func (lw logWriter) Write(b []byte) (int, error) { + lw.t.Logf("%s%s", lw.p, string(b)) + return len(b), nil +} + +func TestBasicCluster(t *testing.T) { + ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk1, err := ts.Connect(0) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk1.Close() + zk2, err := ts.Connect(1) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk2.Close() + + time.Sleep(time.Second * 5) + + if _, err := zk1.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create failed on node 1: %+v", err) + } + if by, _, err := zk2.Get("/gozk-test"); err != nil { + t.Fatalf("Get failed on node 2: %+v", err) + } else if string(by) != "foo-cluster" { + t.Fatal("Wrong data for node 2") + } +} + +// If the current leader dies, then the session is reestablished with the new one. +func TestClientClusterFailover(t *testing.T) { + tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer tc.Stop() + zk, evCh, err := tc.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + sl := NewStateLogger(evCh) + + hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second) + if hasSessionEvent1 == nil { + t.Fatalf("Failed to connect and get session") + } + + if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create failed on node 1: %+v", err) + } + + hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) + + // Kill the current leader + tc.StopServer(hasSessionEvent1.Server) + + // Wait for the session to be reconnected with the new leader. + hasSessionWatcher2.Wait(8 * time.Second) + if hasSessionWatcher2 == nil { + t.Fatalf("Failover failed") + } + + if by, _, err := zk.Get("/gozk-test"); err != nil { + t.Fatalf("Get failed on node 2: %+v", err) + } else if string(by) != "foo-cluster" { + t.Fatal("Wrong data for node 2") + } +} + +// If a ZooKeeper cluster looses quorum then a session is reconnected as soon +// as the quorum is restored. +func TestNoQuorum(t *testing.T) { + tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer tc.Stop() + zk, evCh, err := tc.ConnectAllTimeout(4 * time.Second) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + sl := NewStateLogger(evCh) + + // Wait for initial session to be established + hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second) + if hasSessionEvent1 == nil { + t.Fatalf("Failed to connect and get session") + } + initialSessionID := zk.sessionID + DefaultLogger.Printf(" Session established: id=%d, timeout=%d", zk.sessionID, zk.sessionTimeoutMs) + + // Kill the ZooKeeper leader and wait for the session to reconnect. + DefaultLogger.Printf(" Kill the leader") + hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) + tc.StopServer(hasSessionEvent1.Server) + hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second) + if hasSessionEvent2 == nil { + t.Fatalf("Failover failed") + } + + // Kill the ZooKeeper leader leaving the cluster without quorum. + DefaultLogger.Printf(" Kill the leader") + tc.StopServer(hasSessionEvent2.Server) + + // Make sure that we keep retrying connecting to the only remaining + // ZooKeeper server, but the attempts are being dropped because there is + // no quorum. + DefaultLogger.Printf(" Retrying no luck...") + var firstDisconnect *Event + begin := time.Now() + for time.Now().Sub(begin) < 6*time.Second { + disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second) + if disconnectedEvent == nil { + t.Fatalf("Disconnected event expected") + } + if firstDisconnect == nil { + firstDisconnect = disconnectedEvent + continue + } + if disconnectedEvent.Server != firstDisconnect.Server { + t.Fatalf("Disconnect from wrong server: expected=%s, actual=%s", + firstDisconnect.Server, disconnectedEvent.Server) + } + } + + // Start a ZooKeeper node to restore quorum. + hasSessionWatcher3 := sl.NewWatcher(sessionStateMatcher(StateHasSession)) + tc.StartServer(hasSessionEvent1.Server) + + // Make sure that session is reconnected with the same ID. + hasSessionEvent3 := hasSessionWatcher3.Wait(8 * time.Second) + if hasSessionEvent3 == nil { + t.Fatalf("Session has not been reconnected") + } + if zk.sessionID != initialSessionID { + t.Fatalf("Wrong session ID: expected=%d, actual=%d", initialSessionID, zk.sessionID) + } + + // Make sure that the session is not dropped soon after reconnect + e := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(6 * time.Second) + if e != nil { + t.Fatalf("Unexpected disconnect") + } +} + +func TestWaitForClose(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, err := ts.Connect(0) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + timeout := time.After(30 * time.Second) +CONNECTED: + for { + select { + case ev := <-zk.eventChan: + if ev.State == StateConnected { + break CONNECTED + } + case <-timeout: + zk.Close() + t.Fatal("Timeout") + } + } + zk.Close() + for { + select { + case _, ok := <-zk.eventChan: + if !ok { + return + } + case <-timeout: + t.Fatal("Timeout") + } + } +} + +func TestBadSession(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + zk.conn.Close() + time.Sleep(time.Millisecond * 100) + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } +} + +type EventLogger struct { + events []Event + watchers []*EventWatcher + lock sync.Mutex + wg sync.WaitGroup +} + +func NewStateLogger(eventCh <-chan Event) *EventLogger { + el := &EventLogger{} + el.wg.Add(1) + go func() { + defer el.wg.Done() + for event := range eventCh { + el.lock.Lock() + for _, sw := range el.watchers { + if !sw.triggered && sw.matcher(event) { + sw.triggered = true + sw.matchCh <- event + } + } + DefaultLogger.Printf(" event received: %v\n", event) + el.events = append(el.events, event) + el.lock.Unlock() + } + }() + return el +} + +func (el *EventLogger) NewWatcher(matcher func(Event) bool) *EventWatcher { + ew := &EventWatcher{matcher: matcher, matchCh: make(chan Event, 1)} + el.lock.Lock() + el.watchers = append(el.watchers, ew) + el.lock.Unlock() + return ew +} + +func (el *EventLogger) Events() []Event { + el.lock.Lock() + transitions := make([]Event, len(el.events)) + copy(transitions, el.events) + el.lock.Unlock() + return transitions +} + +func (el *EventLogger) Wait4Stop() { + el.wg.Wait() +} + +type EventWatcher struct { + matcher func(Event) bool + matchCh chan Event + triggered bool +} + +func (ew *EventWatcher) Wait(timeout time.Duration) *Event { + select { + case event := <-ew.matchCh: + return &event + case <-time.After(timeout): + return nil + } +} + +func sessionStateMatcher(s State) func(Event) bool { + return func(e Event) bool { + return e.Type == EventSession && e.State == s + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/conn.go b/vendor/github.com/samuel/go-zookeeper/zk/conn.go index 0df5e87556..7b4d2a3513 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/conn.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/conn.go @@ -596,7 +596,7 @@ func (c *Conn) recvLoop(conn net.Conn) error { if res.Xid == -1 { res := &watcherEvent{} - _, err := decodePacket(buf[16:16+blen], res) + _, err := decodePacket(buf[16:blen], res) if err != nil { return err } @@ -653,7 +653,7 @@ func (c *Conn) recvLoop(conn net.Conn) error { if res.Err != 0 { err = res.Err.toError() } else { - _, err = decodePacket(buf[16:16+blen], req.recvStruct) + _, err = decodePacket(buf[16:blen], req.recvStruct) } if req.recvFunc != nil { req.recvFunc(req, &res, err) diff --git a/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go b/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go new file mode 100644 index 0000000000..9fe6b04ceb --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/constants_test.go @@ -0,0 +1,24 @@ +package zk + +import ( + "fmt" + "testing" +) + +func TestModeString(t *testing.T) { + if fmt.Sprintf("%v", ModeUnknown) != "unknown" { + t.Errorf("unknown value should be 'unknown'") + } + + if fmt.Sprintf("%v", ModeLeader) != "leader" { + t.Errorf("leader value should be 'leader'") + } + + if fmt.Sprintf("%v", ModeFollower) != "follower" { + t.Errorf("follower value should be 'follower'") + } + + if fmt.Sprintf("%v", ModeStandalone) != "standalone" { + t.Errorf("standlone value should be 'standalone'") + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go b/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go new file mode 100644 index 0000000000..954c5493d5 --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/dnshostprovider_test.go @@ -0,0 +1,222 @@ +package zk + +import ( + "fmt" + "log" + "testing" + "time" +) + +// localhostLookupHost is a test replacement for net.LookupHost that +// always returns 127.0.0.1 +func localhostLookupHost(host string) ([]string, error) { + return []string{"127.0.0.1"}, nil +} + +// TestDNSHostProviderCreate is just like TestCreate, but with an +// overridden HostProvider that ignores the provided hostname. +func TestDNSHostProviderCreate(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + port := ts.Servers[0].Port + server := fmt.Sprintf("foo.example.com:%d", port) + hostProvider := &DNSHostProvider{lookupHost: localhostLookupHost} + zk, _, err := Connect([]string{server}, time.Second*15, WithHostProvider(hostProvider)) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + path := "/gozk-test" + + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if p != path { + t.Fatalf("Create returned different path '%s' != '%s'", p, path) + } + if data, stat, err := zk.Get(path); err != nil { + t.Fatalf("Get returned error: %+v", err) + } else if stat == nil { + t.Fatal("Get returned nil stat") + } else if len(data) < 4 { + t.Fatal("Get returned wrong size data") + } +} + +// localHostPortsFacade wraps a HostProvider, remapping the +// address/port combinations it returns to "localhost:$PORT" where +// $PORT is chosen from the provided ports. +type localHostPortsFacade struct { + inner HostProvider // The wrapped HostProvider + ports []int // The provided list of ports + nextPort int // The next port to use + mapped map[string]string // Already-mapped address/port combinations +} + +func newLocalHostPortsFacade(inner HostProvider, ports []int) *localHostPortsFacade { + return &localHostPortsFacade{ + inner: inner, + ports: ports, + mapped: make(map[string]string), + } +} + +func (lhpf *localHostPortsFacade) Len() int { return lhpf.inner.Len() } +func (lhpf *localHostPortsFacade) Connected() { lhpf.inner.Connected() } +func (lhpf *localHostPortsFacade) Init(servers []string) error { return lhpf.inner.Init(servers) } +func (lhpf *localHostPortsFacade) Next() (string, bool) { + server, retryStart := lhpf.inner.Next() + + // If we've already set up a mapping for that server, just return it. + if localMapping := lhpf.mapped[server]; localMapping != "" { + return localMapping, retryStart + } + + if lhpf.nextPort == len(lhpf.ports) { + log.Fatalf("localHostPortsFacade out of ports to assign to %q; current config: %q", server, lhpf.mapped) + } + + localMapping := fmt.Sprintf("localhost:%d", lhpf.ports[lhpf.nextPort]) + lhpf.mapped[server] = localMapping + lhpf.nextPort++ + return localMapping, retryStart +} + +var _ HostProvider = &localHostPortsFacade{} + +// TestDNSHostProviderReconnect tests that the zk.Conn correctly +// reconnects when the Zookeeper instance it's connected to +// restarts. It wraps the DNSHostProvider in a lightweight facade that +// remaps addresses to localhost:$PORT combinations corresponding to +// the test ZooKeeper instances. +func TestDNSHostProviderReconnect(t *testing.T) { + ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + innerHp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) { + return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil + }} + ports := make([]int, 0, len(ts.Servers)) + for _, server := range ts.Servers { + ports = append(ports, server.Port) + } + hp := newLocalHostPortsFacade(innerHp, ports) + + zk, _, err := Connect([]string{"foo.example.com:12345"}, time.Second, WithHostProvider(hp)) + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + path := "/gozk-test" + + // Initial operation to force connection. + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + // Figure out which server we're connected to. + currentServer := zk.Server() + t.Logf("Connected to %q. Finding test server index…", currentServer) + serverIndex := -1 + for i, server := range ts.Servers { + server := fmt.Sprintf("localhost:%d", server.Port) + t.Logf("…trying %q", server) + if currentServer == server { + serverIndex = i + t.Logf("…found at index %d", i) + break + } + } + if serverIndex == -1 { + t.Fatalf("Cannot determine test server index.") + } + + // Restart the connected server. + ts.Servers[serverIndex].Srv.Stop() + ts.Servers[serverIndex].Srv.Start() + + // Continue with the basic TestCreate tests. + if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if p != path { + t.Fatalf("Create returned different path '%s' != '%s'", p, path) + } + if data, stat, err := zk.Get(path); err != nil { + t.Fatalf("Get returned error: %+v", err) + } else if stat == nil { + t.Fatal("Get returned nil stat") + } else if len(data) < 4 { + t.Fatal("Get returned wrong size data") + } + + if zk.Server() == currentServer { + t.Errorf("Still connected to %q after restart.", currentServer) + } +} + +// TestDNSHostProviderRetryStart tests the `retryStart` functionality +// of DNSHostProvider. +// It's also probably the clearest visual explanation of exactly how +// it works. +func TestDNSHostProviderRetryStart(t *testing.T) { + hp := &DNSHostProvider{lookupHost: func(host string) ([]string, error) { + return []string{"192.0.2.1", "192.0.2.2", "192.0.2.3"}, nil + }} + + if err := hp.Init([]string{"foo.example.com:12345"}); err != nil { + t.Fatal(err) + } + + testdata := []struct { + retryStartWant bool + callConnected bool + }{ + // Repeated failures. + {false, false}, + {false, false}, + {false, false}, + {true, false}, + {false, false}, + {false, false}, + {true, true}, + + // One success offsets things. + {false, false}, + {false, true}, + {false, true}, + + // Repeated successes. + {false, true}, + {false, true}, + {false, true}, + {false, true}, + {false, true}, + + // And some more failures. + {false, false}, + {false, false}, + {true, false}, // Looped back to last known good server: all alternates failed. + {false, false}, + } + + for i, td := range testdata { + _, retryStartGot := hp.Next() + if retryStartGot != td.retryStartWant { + t.Errorf("%d: retryStart=%v; want %v", i, retryStartGot, td.retryStartWant) + } + if td.callConnected { + hp.Connected() + } + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/flw.go b/vendor/github.com/samuel/go-zookeeper/zk/flw.go index 1045c98cfd..3e97f96876 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/flw.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/flw.go @@ -5,10 +5,10 @@ import ( "bytes" "fmt" "io/ioutil" - "math/big" "net" "regexp" "strconv" + "strings" "time" ) @@ -22,7 +22,7 @@ import ( // which server had the issue. func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) { // different parts of the regular expression that are required to parse the srvr output - var ( + const ( zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)` zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)` zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)` @@ -31,7 +31,6 @@ func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) { // build the regex from the pieces above re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState)) - if err != nil { return nil, false } @@ -152,14 +151,13 @@ func FLWRuok(servers []string, timeout time.Duration) []bool { // As with FLWSrvr, the boolean value indicates whether one of the requests had // an issue. The Clients struct has an Error value that can be checked. func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) { - var ( + const ( zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]` zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),` zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)` ) re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh)) - if err != nil { return nil, false } @@ -205,41 +203,21 @@ func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) { sid, _ := strconv.ParseInt(match[4], 0, 64) est, _ := strconv.ParseInt(match[6], 0, 64) timeout, _ := strconv.ParseInt(match[7], 0, 32) + lcxid, _ := parseInt64(match[8]) + lzxid, _ := parseInt64(match[9]) lresp, _ := strconv.ParseInt(match[10], 0, 64) llat, _ := strconv.ParseInt(match[11], 0, 32) minlat, _ := strconv.ParseInt(match[12], 0, 32) avglat, _ := strconv.ParseInt(match[13], 0, 32) maxlat, _ := strconv.ParseInt(match[14], 0, 32) - // zookeeper returns a value, '0xffffffffffffffff', as the - // Lzxid for PING requests in the 'cons' output. - // unfortunately, in Go that is an invalid int64 and is not represented - // as -1. - // However, converting the string value to a big.Int and then back to - // and int64 properly sets the value to -1 - lzxid, ok := new(big.Int).SetString(match[9], 0) - - var errVal error - - if !ok { - errVal = fmt.Errorf("failed to convert lzxid value to big.Int") - imOk = false - } - - lcxid, ok := new(big.Int).SetString(match[8], 0) - - if !ok && errVal == nil { - errVal = fmt.Errorf("failed to convert lcxid value to big.Int") - imOk = false - } - clients = append(clients, &ServerClient{ Queued: queued, Received: recvd, Sent: sent, SessionID: sid, - Lcxid: lcxid.Int64(), - Lzxid: lzxid.Int64(), + Lcxid: int64(lcxid), + Lzxid: int64(lzxid), Timeout: int32(timeout), LastLatency: int32(llat), MinLatency: int32(minlat), @@ -249,7 +227,6 @@ func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) { LastResponse: time.Unix(lresp, 0), Addr: match[0], LastOperation: match[5], - Error: errVal, }) } @@ -259,9 +236,17 @@ func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) { return sc, imOk } +// parseInt64 is similar to strconv.ParseInt, but it also handles hex values that represent negative numbers +func parseInt64(s string) (int64, error) { + if strings.HasPrefix(s, "0x") { + i, err := strconv.ParseUint(s, 0, 64) + return int64(i), err + } + return strconv.ParseInt(s, 0, 64) +} + func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) { conn, err := net.DialTimeout("tcp", server, timeout) - if err != nil { return nil, err } @@ -271,20 +256,11 @@ func fourLetterWord(server, command string, timeout time.Duration) ([]byte, erro defer conn.Close() conn.SetWriteDeadline(time.Now().Add(timeout)) - _, err = conn.Write([]byte(command)) - if err != nil { return nil, err } conn.SetReadDeadline(time.Now().Add(timeout)) - - resp, err := ioutil.ReadAll(conn) - - if err != nil { - return nil, err - } - - return resp, nil + return ioutil.ReadAll(conn) } diff --git a/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go b/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go new file mode 100644 index 0000000000..1071b9371f --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/flw_test.go @@ -0,0 +1,327 @@ +package zk + +import ( + "net" + "testing" + "time" +) + +var ( + zkSrvrOut = `Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT +Latency min/avg/max: 0/1/10 +Received: 4207 +Sent: 4220 +Connections: 81 +Outstanding: 1 +Zxid: 0x110a7a8f37 +Mode: leader +Node count: 306 +` + zkConsOut = ` /10.42.45.231:45361[1](queued=0,recved=9435,sent=9457,sid=0x94c2989e04716b5,lop=PING,est=1427238717217,to=20001,lcxid=0x55120915,lzxid=0xffffffffffffffff,lresp=1427259255908,llat=0,minlat=0,avglat=1,maxlat=17) + /10.55.33.98:34342[1](queued=0,recved=9338,sent=9350,sid=0x94c2989e0471731,lop=PING,est=1427238849319,to=20001,lcxid=0x55120944,lzxid=0xffffffffffffffff,lresp=1427259252294,llat=0,minlat=0,avglat=1,maxlat=18) + /10.44.145.114:46556[1](queued=0,recved=109253,sent=109617,sid=0x94c2989e0471709,lop=DELE,est=1427238791305,to=20001,lcxid=0x55139618,lzxid=0x110a7b187d,lresp=1427259257423,llat=2,minlat=0,avglat=1,maxlat=23) + +` +) + +func TestFLWRuok(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + go tcpServer(l, "") + + oks := FLWRuok([]string{l.Addr().String()}, time.Second*10) + if len(oks) == 0 { + t.Errorf("no values returned") + } + if !oks[0] { + t.Errorf("instance should be marked as OK") + } + + // + // Confirm that it also returns false for dead instances + // + l, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + go tcpServer(l, "dead") + + oks = FLWRuok([]string{l.Addr().String()}, time.Second*10) + if len(oks) == 0 { + t.Errorf("no values returned") + } + if oks[0] { + t.Errorf("instance should be marked as not OK") + } +} + +func TestFLWSrvr(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + go tcpServer(l, "") + + statsSlice, ok := FLWSrvr([]string{l.Addr().String()}, time.Second*10) + if !ok { + t.Errorf("failure indicated on 'srvr' parsing") + } + if len(statsSlice) == 0 { + t.Errorf("no *ServerStats instances returned") + } + + stats := statsSlice[0] + + if stats.Error != nil { + t.Fatalf("error seen in stats: %v", err.Error()) + } + + if stats.Sent != 4220 { + t.Errorf("Sent != 4220") + } + + if stats.Received != 4207 { + t.Errorf("Received != 4207") + } + + if stats.NodeCount != 306 { + t.Errorf("NodeCount != 306") + } + + if stats.MinLatency != 0 { + t.Errorf("MinLatency != 0") + } + + if stats.AvgLatency != 1 { + t.Errorf("AvgLatency != 1") + } + + if stats.MaxLatency != 10 { + t.Errorf("MaxLatency != 10") + } + + if stats.Connections != 81 { + t.Errorf("Connection != 81") + } + + if stats.Outstanding != 1 { + t.Errorf("Outstanding != 1") + } + + if stats.Epoch != 17 { + t.Errorf("Epoch != 17") + } + + if stats.Counter != 175804215 { + t.Errorf("Counter != 175804215") + } + + if stats.Mode != ModeLeader { + t.Errorf("Mode != ModeLeader") + } + + if stats.Version != "3.4.6-1569965" { + t.Errorf("Version expected: 3.4.6-1569965") + } +} + +func TestFLWCons(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + go tcpServer(l, "") + + clients, ok := FLWCons([]string{l.Addr().String()}, time.Second*10) + if !ok { + t.Errorf("failure indicated on 'cons' parsing") + } + if len(clients) == 0 { + t.Errorf("no *ServerClients instances returned") + } + + results := []*ServerClient{ + { + Queued: 0, + Received: 9435, + Sent: 9457, + SessionID: 669956116721374901, + LastOperation: "PING", + Established: time.Unix(1427238717217, 0), + Timeout: 20001, + Lcxid: 1427245333, + Lzxid: -1, + LastResponse: time.Unix(1427259255908, 0), + LastLatency: 0, + MinLatency: 0, + AvgLatency: 1, + MaxLatency: 17, + Addr: "10.42.45.231:45361", + }, + { + Queued: 0, + Received: 9338, + Sent: 9350, + SessionID: 669956116721375025, + LastOperation: "PING", + Established: time.Unix(1427238849319, 0), + Timeout: 20001, + Lcxid: 1427245380, + Lzxid: -1, + LastResponse: time.Unix(1427259252294, 0), + LastLatency: 0, + MinLatency: 0, + AvgLatency: 1, + MaxLatency: 18, + Addr: "10.55.33.98:34342", + }, + { + Queued: 0, + Received: 109253, + Sent: 109617, + SessionID: 669956116721374985, + LastOperation: "DELE", + Established: time.Unix(1427238791305, 0), + Timeout: 20001, + Lcxid: 1427346968, + Lzxid: 73190283389, + LastResponse: time.Unix(1427259257423, 0), + LastLatency: 2, + MinLatency: 0, + AvgLatency: 1, + MaxLatency: 23, + Addr: "10.44.145.114:46556", + }, + } + + for _, z := range clients { + if z.Error != nil { + t.Errorf("error seen: %v", err.Error()) + } + + for i, v := range z.Clients { + c := results[i] + + if v.Error != nil { + t.Errorf("client error seen: %v", err.Error()) + } + + if v.Queued != c.Queued { + t.Errorf("Queued value mismatch (%d/%d)", v.Queued, c.Queued) + } + + if v.Received != c.Received { + t.Errorf("Received value mismatch (%d/%d)", v.Received, c.Received) + } + + if v.Sent != c.Sent { + t.Errorf("Sent value mismatch (%d/%d)", v.Sent, c.Sent) + } + + if v.SessionID != c.SessionID { + t.Errorf("SessionID value mismatch (%d/%d)", v.SessionID, c.SessionID) + } + + if v.LastOperation != c.LastOperation { + t.Errorf("LastOperation value mismatch ('%v'/'%v')", v.LastOperation, c.LastOperation) + } + + if v.Timeout != c.Timeout { + t.Errorf("Timeout value mismatch (%d/%d)", v.Timeout, c.Timeout) + } + + if v.Lcxid != c.Lcxid { + t.Errorf("Lcxid value mismatch (%d/%d)", v.Lcxid, c.Lcxid) + } + + if v.Lzxid != c.Lzxid { + t.Errorf("Lzxid value mismatch (%d/%d)", v.Lzxid, c.Lzxid) + } + + if v.LastLatency != c.LastLatency { + t.Errorf("LastLatency value mismatch (%d/%d)", v.LastLatency, c.LastLatency) + } + + if v.MinLatency != c.MinLatency { + t.Errorf("MinLatency value mismatch (%d/%d)", v.MinLatency, c.MinLatency) + } + + if v.AvgLatency != c.AvgLatency { + t.Errorf("AvgLatency value mismatch (%d/%d)", v.AvgLatency, c.AvgLatency) + } + + if v.MaxLatency != c.MaxLatency { + t.Errorf("MaxLatency value mismatch (%d/%d)", v.MaxLatency, c.MaxLatency) + } + + if v.Addr != c.Addr { + t.Errorf("Addr value mismatch ('%v'/'%v')", v.Addr, c.Addr) + } + + if !c.Established.Equal(v.Established) { + t.Errorf("Established value mismatch (%v/%v)", c.Established, v.Established) + } + + if !c.LastResponse.Equal(v.LastResponse) { + t.Errorf("Established value mismatch (%v/%v)", c.LastResponse, v.LastResponse) + } + } + } +} + +func tcpServer(listener net.Listener, thing string) { + for { + conn, err := listener.Accept() + if err != nil { + return + } + go connHandler(conn, thing) + } +} + +func connHandler(conn net.Conn, thing string) { + defer conn.Close() + + data := make([]byte, 4) + + _, err := conn.Read(data) + if err != nil { + return + } + + switch string(data) { + case "ruok": + switch thing { + case "dead": + return + default: + conn.Write([]byte("imok")) + } + case "srvr": + switch thing { + case "dead": + return + default: + conn.Write([]byte(zkSrvrOut)) + } + case "cons": + switch thing { + case "dead": + return + default: + conn.Write([]byte(zkConsOut)) + } + default: + conn.Write([]byte("This ZooKeeper instance is not currently serving requests.")) + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go b/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go new file mode 100644 index 0000000000..8a3478a336 --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/lock_test.go @@ -0,0 +1,94 @@ +package zk + +import ( + "testing" + "time" +) + +func TestLock(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + + l := NewLock(zk, "/test", acls) + if err := l.Lock(); err != nil { + t.Fatal(err) + } + if err := l.Unlock(); err != nil { + t.Fatal(err) + } + + val := make(chan int, 3) + + if err := l.Lock(); err != nil { + t.Fatal(err) + } + + l2 := NewLock(zk, "/test", acls) + go func() { + if err := l2.Lock(); err != nil { + t.Fatal(err) + } + val <- 2 + if err := l2.Unlock(); err != nil { + t.Fatal(err) + } + val <- 3 + }() + time.Sleep(time.Millisecond * 100) + + val <- 1 + if err := l.Unlock(); err != nil { + t.Fatal(err) + } + if x := <-val; x != 1 { + t.Fatalf("Expected 1 instead of %d", x) + } + if x := <-val; x != 2 { + t.Fatalf("Expected 2 instead of %d", x) + } + if x := <-val; x != 3 { + t.Fatalf("Expected 3 instead of %d", x) + } +} + +// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"), +// when a part of that path already exists (i.e. "/test-multi-level" node already exists). +func TestMultiLevelLock(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + path := "/test-multi-level" + if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if p != path { + t.Fatalf("Create returned different path '%s' != '%s'", p, path) + } + l := NewLock(zk, "/test-multi-level/lock", acls) + defer zk.Delete("/test-multi-level", -1) // Clean up what we've created for this test + defer zk.Delete("/test-multi-level/lock", -1) + if err := l.Lock(); err != nil { + t.Fatal(err) + } + if err := l.Unlock(); err != nil { + t.Fatal(err) + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go b/vendor/github.com/samuel/go-zookeeper/zk/server_help.go index a0e12cf433..b4e42aeef2 100644 --- a/vendor/github.com/samuel/go-zookeeper/zk/server_help.go +++ b/vendor/github.com/samuel/go-zookeeper/zk/server_help.go @@ -88,8 +88,10 @@ func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) Srv: srv, }) } + if err := cluster.waitForStart(5, time.Second); err != nil { + return nil, err + } success = true - time.Sleep(3 * time.Second) // Give the server time to become active. Should probably actually attempt to connect to verify. return cluster, nil } @@ -116,6 +118,50 @@ func (ts *TestCluster) Stop() error { srv.Srv.Stop() } defer os.RemoveAll(ts.Path) + return ts.waitForStop(5, 1*time.Second) +} + +// block until the cluster is up +func (ts *TestCluster) waitForStart(maxRetry int, interval time.Duration) error { + // verify that the servers are up with SRVR + serverAddrs := make([]string, len(ts.Servers)) + for i, s := range ts.Servers { + serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) + } + + for i := 0; i < maxRetry; i++ { + _, ok := FLWSrvr(serverAddrs, time.Second) + if ok { + return nil + } + time.Sleep(interval) + } + return fmt.Errorf("unable to verify health of servers!") +} + +// block until the cluster is down +func (ts *TestCluster) waitForStop(maxRetry int, interval time.Duration) error { + // verify that the servers are up with RUOK + serverAddrs := make([]string, len(ts.Servers)) + for i, s := range ts.Servers { + serverAddrs[i] = fmt.Sprintf("127.0.0.1:%d", s.Port) + } + + var success bool + for i := 0; i < maxRetry && !success; i++ { + success = true + for _, ok := range FLWRuok(serverAddrs, time.Second) { + if ok { + success = false + } + } + if !success { + time.Sleep(interval) + } + } + if !success { + return fmt.Errorf("unable to verify servers are down!") + } return nil } diff --git a/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go b/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go new file mode 100644 index 0000000000..cafbbd95c2 --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/structs_test.go @@ -0,0 +1,71 @@ +package zk + +import ( + "reflect" + "testing" +) + +func TestEncodeDecodePacket(t *testing.T) { + encodeDecodeTest(t, &requestHeader{-2, 5}) + encodeDecodeTest(t, &connectResponse{1, 2, 3, nil}) + encodeDecodeTest(t, &connectResponse{1, 2, 3, []byte{4, 5, 6}}) + encodeDecodeTest(t, &getAclResponse{[]ACL{{12, "s", "anyone"}}, Stat{}}) + encodeDecodeTest(t, &getChildrenResponse{[]string{"foo", "bar"}}) + encodeDecodeTest(t, &pathWatchRequest{"path", true}) + encodeDecodeTest(t, &pathWatchRequest{"path", false}) + encodeDecodeTest(t, &CheckVersionRequest{"/", -1}) + encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}}) +} + +func encodeDecodeTest(t *testing.T, r interface{}) { + buf := make([]byte, 1024) + n, err := encodePacket(buf, r) + if err != nil { + t.Errorf("encodePacket returned non-nil error %+v\n", err) + return + } + t.Logf("%+v %x", r, buf[:n]) + r2 := reflect.New(reflect.ValueOf(r).Elem().Type()).Interface() + n2, err := decodePacket(buf[:n], r2) + if err != nil { + t.Errorf("decodePacket returned non-nil error %+v\n", err) + return + } + if n != n2 { + t.Errorf("sizes don't match: %d != %d", n, n2) + return + } + if !reflect.DeepEqual(r, r2) { + t.Errorf("results don't match: %+v != %+v", r, r2) + return + } +} + +func TestEncodeShortBuffer(t *testing.T) { + buf := make([]byte, 0) + _, err := encodePacket(buf, &requestHeader{1, 2}) + if err != ErrShortBuffer { + t.Errorf("encodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err) + return + } +} + +func TestDecodeShortBuffer(t *testing.T) { + buf := make([]byte, 0) + _, err := decodePacket(buf, &responseHeader{}) + if err != ErrShortBuffer { + t.Errorf("decodePacket should return ErrShortBuffer on a short buffer instead of '%+v'", err) + return + } +} + +func BenchmarkEncode(b *testing.B) { + buf := make([]byte, 4096) + st := &connectRequest{Passwd: []byte("1234567890")} + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if _, err := encodePacket(buf, st); err != nil { + b.Fatal(err) + } + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go b/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go new file mode 100644 index 0000000000..633ce05fcd --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/throttle_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2012 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Vendored from go4.org/net/throttle + +package zk + +import ( + "fmt" + "net" + "sync" + "time" +) + +const unitSize = 1400 // read/write chunk size. ~MTU size. + +type Rate struct { + KBps int // or 0, to not rate-limit bandwidth + Latency time.Duration +} + +// byteTime returns the time required for n bytes. +func (r Rate) byteTime(n int) time.Duration { + if r.KBps == 0 { + return 0 + } + return time.Duration(float64(n)/1024/float64(r.KBps)) * time.Second +} + +type Listener struct { + net.Listener + Down Rate // server Writes to Client + Up Rate // server Reads from client +} + +func (ln *Listener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + time.Sleep(ln.Up.Latency) + if err != nil { + return nil, err + } + tc := &conn{Conn: c, Down: ln.Down, Up: ln.Up} + tc.start() + return tc, nil +} + +type nErr struct { + n int + err error +} + +type writeReq struct { + writeAt time.Time + p []byte + resc chan nErr +} + +type conn struct { + net.Conn + Down Rate // for reads + Up Rate // for writes + + wchan chan writeReq + closeOnce sync.Once + closeErr error +} + +func (c *conn) start() { + c.wchan = make(chan writeReq, 1024) + go c.writeLoop() +} + +func (c *conn) writeLoop() { + for req := range c.wchan { + time.Sleep(req.writeAt.Sub(time.Now())) + var res nErr + for len(req.p) > 0 && res.err == nil { + writep := req.p + if len(writep) > unitSize { + writep = writep[:unitSize] + } + n, err := c.Conn.Write(writep) + time.Sleep(c.Up.byteTime(len(writep))) + res.n += n + res.err = err + req.p = req.p[n:] + } + req.resc <- res + } +} + +func (c *conn) Close() error { + c.closeOnce.Do(func() { + err := c.Conn.Close() + close(c.wchan) + c.closeErr = err + }) + return c.closeErr +} + +func (c *conn) Write(p []byte) (n int, err error) { + defer func() { + if e := recover(); e != nil { + n = 0 + err = fmt.Errorf("%v", err) + return + } + }() + resc := make(chan nErr, 1) + c.wchan <- writeReq{time.Now().Add(c.Up.Latency), p, resc} + res := <-resc + return res.n, res.err +} + +func (c *conn) Read(p []byte) (n int, err error) { + const max = 1024 + if len(p) > max { + p = p[:max] + } + n, err = c.Conn.Read(p) + time.Sleep(c.Down.byteTime(n)) + return +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/util_test.go b/vendor/github.com/samuel/go-zookeeper/zk/util_test.go new file mode 100644 index 0000000000..b56f77550d --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/util_test.go @@ -0,0 +1,17 @@ +package zk + +import "testing" + +func TestFormatServers(t *testing.T) { + servers := []string{"127.0.0.1:2181", "127.0.0.42", "127.0.42.1:8811"} + r := []string{"127.0.0.1:2181", "127.0.0.42:2181", "127.0.42.1:8811"} + + var s []string + s = FormatServers(servers) + + for i := range s { + if s[i] != r[i] { + t.Errorf("%v should equal %v", s[i], r[i]) + } + } +} diff --git a/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go b/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go new file mode 100644 index 0000000000..9fe395cfe4 --- /dev/null +++ b/vendor/github.com/samuel/go-zookeeper/zk/zk_test.go @@ -0,0 +1,573 @@ +package zk + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "io" + "net" + "strings" + "testing" + "time" +) + +func TestCreate(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + path := "/gozk-test" + + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if p != path { + t.Fatalf("Create returned different path '%s' != '%s'", p, path) + } + if data, stat, err := zk.Get(path); err != nil { + t.Fatalf("Get returned error: %+v", err) + } else if stat == nil { + t.Fatal("Get returned nil stat") + } else if len(data) < 4 { + t.Fatal("Get returned wrong size data") + } +} + +func TestMulti(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + path := "/gozk-test" + + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + ops := []interface{}{ + &CreateRequest{Path: path, Data: []byte{1, 2, 3, 4}, Acl: WorldACL(PermAll)}, + &SetDataRequest{Path: path, Data: []byte{1, 2, 3, 4}, Version: -1}, + } + if res, err := zk.Multi(ops...); err != nil { + t.Fatalf("Multi returned error: %+v", err) + } else if len(res) != 2 { + t.Fatalf("Expected 2 responses got %d", len(res)) + } else { + t.Logf("%+v", res) + } + if data, stat, err := zk.Get(path); err != nil { + t.Fatalf("Get returned error: %+v", err) + } else if stat == nil { + t.Fatal("Get returned nil stat") + } else if len(data) < 4 { + t.Fatal("Get returned wrong size data") + } +} + +func TestGetSetACL(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + if err := zk.AddAuth("digest", []byte("blah")); err != nil { + t.Fatalf("AddAuth returned error %+v", err) + } + + path := "/gozk-test" + + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + if path, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if path != "/gozk-test" { + t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) + } + + expected := WorldACL(PermAll) + + if acl, stat, err := zk.GetACL(path); err != nil { + t.Fatalf("GetACL returned error %+v", err) + } else if stat == nil { + t.Fatalf("GetACL returned nil Stat") + } else if len(acl) != 1 || expected[0] != acl[0] { + t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl) + } + + expected = []ACL{{PermAll, "ip", "127.0.0.1"}} + + if stat, err := zk.SetACL(path, expected, -1); err != nil { + t.Fatalf("SetACL returned error %+v", err) + } else if stat == nil { + t.Fatalf("SetACL returned nil Stat") + } + + if acl, stat, err := zk.GetACL(path); err != nil { + t.Fatalf("GetACL returned error %+v", err) + } else if stat == nil { + t.Fatalf("GetACL returned nil Stat") + } else if len(acl) != 1 || expected[0] != acl[0] { + t.Fatalf("GetACL mismatch expected %+v instead of %+v", expected, acl) + } +} + +func TestAuth(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + path := "/gozk-digest-test" + if err := zk.Delete(path, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + acl := DigestACL(PermAll, "user", "password") + + if p, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, acl); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if p != path { + t.Fatalf("Create returned different path '%s' != '%s'", p, path) + } + + if a, stat, err := zk.GetACL(path); err != nil { + t.Fatalf("GetACL returned error %+v", err) + } else if stat == nil { + t.Fatalf("GetACL returned nil Stat") + } else if len(a) != 1 || acl[0] != a[0] { + t.Fatalf("GetACL mismatch expected %+v instead of %+v", acl, a) + } + + if _, _, err := zk.Get(path); err != ErrNoAuth { + t.Fatalf("Get returned error %+v instead of ErrNoAuth", err) + } + + if err := zk.AddAuth("digest", []byte("user:password")); err != nil { + t.Fatalf("AddAuth returned error %+v", err) + } + + if data, stat, err := zk.Get(path); err != nil { + t.Fatalf("Get returned error %+v", err) + } else if stat == nil { + t.Fatalf("Get returned nil Stat") + } else if len(data) != 4 { + t.Fatalf("Get returned wrong data length") + } +} + +func TestChildren(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + deleteNode := func(node string) { + if err := zk.Delete(node, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + } + + deleteNode("/gozk-test-big") + + if path, err := zk.Create("/gozk-test-big", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if path != "/gozk-test-big" { + t.Fatalf("Create returned different path '%s' != '/gozk-test-big'", path) + } + + rb := make([]byte, 1000) + hb := make([]byte, 2000) + prefix := []byte("/gozk-test-big/") + for i := 0; i < 10000; i++ { + _, err := rand.Read(rb) + if err != nil { + t.Fatal("Cannot create random znode name") + } + hex.Encode(hb, rb) + + expect := string(append(prefix, hb...)) + if path, err := zk.Create(expect, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if path != expect { + t.Fatalf("Create returned different path '%s' != '%s'", path, expect) + } + defer deleteNode(string(expect)) + } + + children, _, err := zk.Children("/gozk-test-big") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } else if len(children) != 10000 { + t.Fatal("Children returned wrong number of nodes") + } +} + +func TestChildWatch(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + children, stat, childCh, err := zk.ChildrenW("/") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } else if stat == nil { + t.Fatal("Children returned nil stat") + } else if len(children) < 1 { + t.Fatal("Children should return at least 1 child") + } + + if path, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if path != "/gozk-test" { + t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) + } + + select { + case ev := <-childCh: + if ev.Err != nil { + t.Fatalf("Child watcher error %+v", ev.Err) + } + if ev.Path != "/" { + t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") + } + case _ = <-time.After(time.Second * 2): + t.Fatal("Child watcher timed out") + } + + // Delete of the watched node should trigger the watch + + children, stat, childCh, err = zk.ChildrenW("/gozk-test") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } else if stat == nil { + t.Fatal("Children returned nil stat") + } else if len(children) != 0 { + t.Fatal("Children should return 0 children") + } + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + select { + case ev := <-childCh: + if ev.Err != nil { + t.Fatalf("Child watcher error %+v", ev.Err) + } + if ev.Path != "/gozk-test" { + t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") + } + case _ = <-time.After(time.Second * 2): + t.Fatal("Child watcher timed out") + } +} + +func TestSetWatchers(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + zk.reconnectDelay = time.Second + + zk2, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk2.Close() + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + testPath, err := zk.Create("/gozk-test-2", []byte{}, 0, WorldACL(PermAll)) + if err != nil { + t.Fatalf("Create returned: %+v", err) + } + + _, _, testEvCh, err := zk.GetW(testPath) + if err != nil { + t.Fatalf("GetW returned: %+v", err) + } + + children, stat, childCh, err := zk.ChildrenW("/") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } else if stat == nil { + t.Fatal("Children returned nil stat") + } else if len(children) < 1 { + t.Fatal("Children should return at least 1 child") + } + + // Simulate network error by brutally closing the network connection. + zk.conn.Close() + if err := zk2.Delete(testPath, -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + // Allow some time for the `zk` session to reconnect and set watches. + time.Sleep(time.Millisecond * 100) + + if path, err := zk2.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatalf("Create returned error: %+v", err) + } else if path != "/gozk-test" { + t.Fatalf("Create returned different path '%s' != '/gozk-test'", path) + } + + select { + case ev := <-testEvCh: + if ev.Err != nil { + t.Fatalf("GetW watcher error %+v", ev.Err) + } + if ev.Path != testPath { + t.Fatalf("GetW watcher wrong path %s instead of %s", ev.Path, testPath) + } + case <-time.After(2 * time.Second): + t.Fatal("GetW watcher timed out") + } + + select { + case ev := <-childCh: + if ev.Err != nil { + t.Fatalf("Child watcher error %+v", ev.Err) + } + if ev.Path != "/" { + t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") + } + case <-time.After(2 * time.Second): + t.Fatal("Child watcher timed out") + } +} + +func TestExpiringWatch(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + if err := zk.Delete("/gozk-test", -1); err != nil && err != ErrNoNode { + t.Fatalf("Delete returned error: %+v", err) + } + + children, stat, childCh, err := zk.ChildrenW("/") + if err != nil { + t.Fatalf("Children returned error: %+v", err) + } else if stat == nil { + t.Fatal("Children returned nil stat") + } else if len(children) < 1 { + t.Fatal("Children should return at least 1 child") + } + + zk.sessionID = 99999 + zk.conn.Close() + + select { + case ev := <-childCh: + if ev.Err != ErrSessionExpired { + t.Fatalf("Child watcher error %+v instead of expected ErrSessionExpired", ev.Err) + } + if ev.Path != "/" { + t.Fatalf("Child watcher wrong path %s instead of %s", ev.Path, "/") + } + case <-time.After(2 * time.Second): + t.Fatal("Child watcher timed out") + } +} + +func TestRequestFail(t *testing.T) { + // If connecting fails to all servers in the list then pending requests + // should be errored out so they don't hang forever. + + zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15) + if err != nil { + t.Fatal(err) + } + defer zk.Close() + + ch := make(chan error) + go func() { + _, _, err := zk.Get("/blah") + ch <- err + }() + select { + case err := <-ch: + if err == nil { + t.Fatal("Expected non-nil error on failed request due to connection failure") + } + case <-time.After(time.Second * 2): + t.Fatal("Get hung when connection could not be made") + } +} + +func TestSlowServer(t *testing.T) { + ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + realAddr := fmt.Sprintf("127.0.0.1:%d", ts.Servers[0].Port) + proxyAddr, stopCh, err := startSlowProxy(t, + Rate{}, Rate{}, + realAddr, func(ln *Listener) { + if ln.Up.Latency == 0 { + ln.Up.Latency = time.Millisecond * 2000 + ln.Down.Latency = time.Millisecond * 2000 + } else { + ln.Up.Latency = 0 + ln.Down.Latency = 0 + } + }) + if err != nil { + t.Fatal(err) + } + defer close(stopCh) + + zk, _, err := Connect([]string{proxyAddr}, time.Millisecond*500) + if err != nil { + t.Fatal(err) + } + defer zk.Close() + + _, _, wch, err := zk.ChildrenW("/") + if err != nil { + t.Fatal(err) + } + + // Force a reconnect to get a throttled connection + zk.conn.Close() + + time.Sleep(time.Millisecond * 100) + + if err := zk.Delete("/gozk-test", -1); err == nil { + t.Fatal("Delete should have failed") + } + + // The previous request should have timed out causing the server to be disconnected and reconnected + + if _, err := zk.Create("/gozk-test", []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err != nil { + t.Fatal(err) + } + + // Make sure event is still returned because the session should not have been affected + select { + case ev := <-wch: + t.Logf("Received event: %+v", ev) + case <-time.After(time.Second): + t.Fatal("Expected to receive a watch event") + } +} + +func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", nil, err + } + tln := &Listener{ + Listener: ln, + Up: up, + Down: down, + } + stopCh := make(chan bool) + go func() { + <-stopCh + tln.Close() + }() + go func() { + for { + cn, err := tln.Accept() + if err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + t.Fatalf("Accept failed: %s", err.Error()) + } + return + } + if adj != nil { + adj(tln) + } + go func(cn net.Conn) { + defer cn.Close() + upcn, err := net.Dial("tcp", upstream) + if err != nil { + t.Log(err) + return + } + // This will leave hanging goroutines util stopCh is closed + // but it doesn't matter in the context of running tests. + go func() { + <-stopCh + upcn.Close() + }() + go func() { + if _, err := io.Copy(upcn, cn); err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + // log.Printf("Upstream write failed: %s", err.Error()) + } + } + }() + if _, err := io.Copy(cn, upcn); err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + // log.Printf("Upstream read failed: %s", err.Error()) + } + } + }(cn) + } + }() + return ln.Addr().String(), stopCh, nil +}