From 080d8b85058194d8a01c850867d2c7eb3762d4b9 Mon Sep 17 00:00:00 2001 From: Eric Buth Date: Sun, 24 May 2015 12:38:49 -0400 Subject: [PATCH] added etcd as a non-HA storage backend, updated documentation --- Godeps/Godeps.json | 9 + .../aws-sdk-go/internal/apierr/error.go | 139 +++++ .../coreos/go-etcd/etcd/add_child.go | 23 + .../coreos/go-etcd/etcd/add_child_test.go | 73 +++ .../github.com/coreos/go-etcd/etcd/client.go | 481 ++++++++++++++++++ .../coreos/go-etcd/etcd/client_test.go | 108 ++++ .../github.com/coreos/go-etcd/etcd/cluster.go | 37 ++ .../coreos/go-etcd/etcd/compare_and_delete.go | 34 ++ .../go-etcd/etcd/compare_and_delete_test.go | 46 ++ .../coreos/go-etcd/etcd/compare_and_swap.go | 36 ++ .../go-etcd/etcd/compare_and_swap_test.go | 57 +++ .../github.com/coreos/go-etcd/etcd/debug.go | 55 ++ .../coreos/go-etcd/etcd/debug_test.go | 28 + .../github.com/coreos/go-etcd/etcd/delete.go | 40 ++ .../coreos/go-etcd/etcd/delete_test.go | 81 +++ .../github.com/coreos/go-etcd/etcd/error.go | 49 ++ .../src/github.com/coreos/go-etcd/etcd/get.go | 32 ++ .../coreos/go-etcd/etcd/get_test.go | 131 +++++ .../github.com/coreos/go-etcd/etcd/member.go | 30 ++ .../coreos/go-etcd/etcd/member_test.go | 71 +++ .../github.com/coreos/go-etcd/etcd/options.go | 72 +++ .../coreos/go-etcd/etcd/requests.go | 403 +++++++++++++++ .../coreos/go-etcd/etcd/requests_test.go | 22 + .../coreos/go-etcd/etcd/response.go | 89 ++++ .../coreos/go-etcd/etcd/set_curl_chan_test.go | 42 ++ .../coreos/go-etcd/etcd/set_update_create.go | 137 +++++ .../go-etcd/etcd/set_update_create_test.go | 241 +++++++++ .../github.com/coreos/go-etcd/etcd/version.go | 6 + .../github.com/coreos/go-etcd/etcd/watch.go | 103 ++++ .../coreos/go-etcd/etcd/watch_test.go | 119 +++++ physical/etcd.go | 169 ++++++ physical/etcd_test.go | 40 ++ physical/physical.go | 1 + website/source/docs/config/index.html.md | 14 + 34 files changed, 3018 insertions(+) create mode 100644 Godeps/_workspace/src/github.com/awslabs/aws-sdk-go/internal/apierr/error.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/response.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create_test.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch.go create mode 100644 Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch_test.go create mode 100644 physical/etcd.go create mode 100644 physical/etcd_test.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index daf531be30..6fd5d59155 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -14,6 +14,10 @@ "ImportPath": "github.com/awslabs/aws-sdk-go/aws", "Rev": "29717a72a2fc649e790ce97dc2c4d96e32950844" }, + { + "ImportPath": "github.com/awslabs/aws-sdk-go/internal/apierr", + "Rev": "29717a72a2fc649e790ce97dc2c4d96e32950844" + }, { "ImportPath": "github.com/awslabs/aws-sdk-go/internal/endpoints", "Rev": "29717a72a2fc649e790ce97dc2c4d96e32950844" @@ -42,6 +46,11 @@ "ImportPath": "github.com/awslabs/aws-sdk-go/service/s3", "Rev": "29717a72a2fc649e790ce97dc2c4d96e32950844" }, + { + "ImportPath": "github.com/coreos/go-etcd/etcd", + "Comment": "v2.0.0-7-g73a8ef7", + "Rev": "73a8ef737e8ea002281a28b4cb92a1de121ad4c6" + }, { "ImportPath": "github.com/go-sql-driver/mysql", "Comment": "v1.2-88-ga197e5d", diff --git a/Godeps/_workspace/src/github.com/awslabs/aws-sdk-go/internal/apierr/error.go b/Godeps/_workspace/src/github.com/awslabs/aws-sdk-go/internal/apierr/error.go new file mode 100644 index 0000000000..58eb294764 --- /dev/null +++ b/Godeps/_workspace/src/github.com/awslabs/aws-sdk-go/internal/apierr/error.go @@ -0,0 +1,139 @@ +// Package of API error types. +package apierr + +import "fmt" + +// A BaseError wraps the code and message which defines an error. It also +// can be used to wrap an original error object. +// +// Should be used as the root for errors satisfying the awserr.Error. Also +// for any error which does not fit into a specific error wrapper type. +type BaseError struct { + // Classification of error + code string + + // Detailed information about error + message string + + // Optional original error this error is based off of. Allows building + // chained errors. + origErr error +} + +// New returns an error object for the code, message, and err. +// +// code is a short no whitespace phrase depicting the classification of +// the error that is being created. +// +// message is the free flow string containing detailed information about the error. +// +// origErr is the error object which will be nested under the new error to be returned. +func New(code, message string, origErr error) *BaseError { + return &BaseError{ + code: code, + message: message, + origErr: origErr, + } +} + +// Error returns the string representation of the error. +// +// See ErrorWithExtra for formatting. +// +// Satisfies the error interface. +func (b *BaseError) Error() string { + return b.ErrorWithExtra("") +} + +// String returns the string representation of the error. +// Alias for Error to satisfy the stringer interface. +func (b *BaseError) String() string { + return b.Error() +} + +// Code returns the short phrase depicting the classification of the error. +func (b *BaseError) Code() string { + return b.code +} + +// Message returns the error details message. +func (b *BaseError) Message() string { + return b.message +} + +// Err returns the original error if one was set. Nil is returned if no error +// was set. +func (b *BaseError) OrigErr() error { + return b.origErr +} + +// ErrorWithExtra is a helper method to add an extra string to the stratified +// error message. The extra message will be added on the next line below the +// error message like the following: +// +// : +// +// +// If there is a original error the error will be included on a new line. +// +// : +// +// caused by: +func (b *BaseError) ErrorWithExtra(extra string) string { + msg := fmt.Sprintf("%s: %s", b.code, b.message) + if extra != "" { + msg = fmt.Sprintf("%s\n\t%s", msg, extra) + } + if b.origErr != nil { + msg = fmt.Sprintf("%s\ncaused by: %s", msg, b.origErr.Error()) + } + return msg +} + +// A RequestError wraps a request or service error. +// +// Composed of BaseError for code, message, and original error. +type RequestError struct { + *BaseError + statusCode int + requestID string +} + +// NewRequestError returns a wrapped error with additional information for request +// status code, and service requestID. +// +// Should be used to wrap all request which involve service requests. Even if +// the request failed without a service response, but had an HTTP status code +// that may be meaningful. +// +// Also wraps original errors via the BaseError. +func NewRequestError(base *BaseError, statusCode int, requestID string) *RequestError { + return &RequestError{ + BaseError: base, + statusCode: statusCode, + requestID: requestID, + } +} + +// Error returns the string representation of the error. +// Satisfies the error interface. +func (r *RequestError) Error() string { + return r.ErrorWithExtra(fmt.Sprintf("status code: %d, request id: [%s]", + r.statusCode, r.requestID)) +} + +// String returns the string representation of the error. +// Alias for Error to satisfy the stringer interface. +func (r *RequestError) String() string { + return r.Error() +} + +// StatusCode returns the wrapped status code for the error +func (r *RequestError) StatusCode() int { + return r.statusCode +} + +// RequestID returns the wrapped requestID +func (r *RequestError) RequestID() string { + return r.requestID +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child.go new file mode 100644 index 0000000000..7122be049e --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child.go @@ -0,0 +1,23 @@ +package etcd + +// Add a new directory with a random etcd-generated key under the given path. +func (c *Client) AddChildDir(key string, ttl uint64) (*Response, error) { + raw, err := c.post(key, "", ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// Add a new file with a random etcd-generated key under the given path. +func (c *Client) AddChild(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.post(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child_test.go new file mode 100644 index 0000000000..26223ff1c8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/add_child_test.go @@ -0,0 +1,73 @@ +package etcd + +import "testing" + +func TestAddChild(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("fooDir", true) + c.Delete("nonexistentDir", true) + }() + + c.CreateDir("fooDir", 5) + + _, err := c.AddChild("fooDir", "v0", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChild("fooDir", "v1", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true, false) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Node.Nodes) == 2 && (resp.Node.Nodes[0].Value == "v0" && resp.Node.Nodes[1].Value == "v1")) { + t.Fatalf("AddChild 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChild("nonexistentDir", "foo", 5) + if err != nil { + t.Fatal(err) + } +} + +func TestAddChildDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("fooDir", true) + c.Delete("nonexistentDir", true) + }() + + c.CreateDir("fooDir", 5) + + _, err := c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + _, err = c.AddChildDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + resp, err := c.Get("fooDir", true, false) + // The child with v0 should proceed the child with v1 because it's added + // earlier, so it should have a lower key. + if !(len(resp.Node.Nodes) == 2 && (len(resp.Node.Nodes[0].Nodes) == 0 && len(resp.Node.Nodes[1].Nodes) == 0)) { + t.Fatalf("AddChildDir 1 failed. There should be two chlidren whose values are v0 and v1, respectively."+ + " The response was: %#v", resp) + } + + // Creating a child under a nonexistent directory should succeed. + // The directory should be created. + resp, err = c.AddChildDir("nonexistentDir", 5) + if err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go new file mode 100644 index 0000000000..c6cf3341ba --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client.go @@ -0,0 +1,481 @@ +package etcd + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "io" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/url" + "os" + "path" + "strings" + "time" +) + +// See SetConsistency for how to use these constants. +const ( + // Using strings rather than iota because the consistency level + // could be persisted to disk, so it'd be better to use + // human-readable values. + STRONG_CONSISTENCY = "STRONG" + WEAK_CONSISTENCY = "WEAK" +) + +const ( + defaultBufferSize = 10 +) + +func init() { + rand.Seed(int64(time.Now().Nanosecond())) +} + +type Config struct { + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` + CaCertFile []string `json:"caCertFiles"` + DialTimeout time.Duration `json:"timeout"` + Consistency string `json:"consistency"` +} + +type credentials struct { + username string + password string +} + +type Client struct { + config Config `json:"config"` + cluster *Cluster `json:"cluster"` + httpClient *http.Client + credentials *credentials + transport *http.Transport + persistence io.Writer + cURLch chan string + // CheckRetry can be used to control the policy for failed requests + // and modify the cluster if needed. + // The client calls it before sending requests again, and + // stops retrying if CheckRetry returns some error. The cases that + // this function needs to handle include no response and unexpected + // http status code of response. + // If CheckRetry is nil, client will call the default one + // `DefaultCheckRetry`. + // Argument cluster is the etcd.Cluster object that these requests have been made on. + // Argument numReqs is the number of http.Requests that have been made so far. + // Argument lastResp is the http.Responses from the last request. + // Argument err is the reason of the failure. + CheckRetry func(cluster *Cluster, numReqs int, + lastResp http.Response, err error) error +} + +// NewClient create a basic client that is configured to be used +// with the given machine list. +func NewClient(machines []string) *Client { + config := Config{ + // default timeout is one second + DialTimeout: time.Second, + Consistency: WEAK_CONSISTENCY, + } + + client := &Client{ + cluster: NewCluster(machines), + config: config, + } + + client.initHTTPClient() + client.saveConfig() + + return client +} + +// NewTLSClient create a basic client with TLS configuration +func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) { + // overwrite the default machine to use https + if len(machines) == 0 { + machines = []string{"https://127.0.0.1:4001"} + } + + config := Config{ + // default timeout is one second + DialTimeout: time.Second, + Consistency: WEAK_CONSISTENCY, + CertFile: cert, + KeyFile: key, + CaCertFile: make([]string, 0), + } + + client := &Client{ + cluster: NewCluster(machines), + config: config, + } + + err := client.initHTTPSClient(cert, key) + if err != nil { + return nil, err + } + + err = client.AddRootCA(caCert) + + client.saveConfig() + + return client, nil +} + +// NewClientFromFile creates a client from a given file path. +// The given file is expected to use the JSON format. +func NewClientFromFile(fpath string) (*Client, error) { + fi, err := os.Open(fpath) + if err != nil { + return nil, err + } + + defer func() { + if err := fi.Close(); err != nil { + panic(err) + } + }() + + return NewClientFromReader(fi) +} + +// NewClientFromReader creates a Client configured from a given reader. +// The configuration is expected to use the JSON format. +func NewClientFromReader(reader io.Reader) (*Client, error) { + c := new(Client) + + b, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + + err = json.Unmarshal(b, c) + if err != nil { + return nil, err + } + if c.config.CertFile == "" { + c.initHTTPClient() + } else { + err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile) + } + + if err != nil { + return nil, err + } + + for _, caCert := range c.config.CaCertFile { + if err := c.AddRootCA(caCert); err != nil { + return nil, err + } + } + + return c, nil +} + +// Override the Client's HTTP Transport object +func (c *Client) SetTransport(tr *http.Transport) { + c.httpClient.Transport = tr + c.transport = tr +} + +func (c *Client) SetCredentials(username, password string) { + c.credentials = &credentials{username, password} +} + +func (c *Client) Close() { + c.transport.DisableKeepAlives = true + c.transport.CloseIdleConnections() +} + +// initHTTPClient initializes a HTTP client for etcd client +func (c *Client) initHTTPClient() { + c.transport = &http.Transport{ + Dial: c.dial, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + c.httpClient = &http.Client{Transport: c.transport} +} + +// initHTTPClient initializes a HTTPS client for etcd client +func (c *Client) initHTTPSClient(cert, key string) error { + if cert == "" || key == "" { + return errors.New("Require both cert and key path") + } + + tlsCert, err := tls.LoadX509KeyPair(cert, key) + if err != nil { + return err + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{tlsCert}, + InsecureSkipVerify: true, + } + + tr := &http.Transport{ + TLSClientConfig: tlsConfig, + Dial: c.dial, + } + + c.httpClient = &http.Client{Transport: tr} + return nil +} + +// SetPersistence sets a writer to which the config will be +// written every time it's changed. +func (c *Client) SetPersistence(writer io.Writer) { + c.persistence = writer +} + +// SetConsistency changes the consistency level of the client. +// +// When consistency is set to STRONG_CONSISTENCY, all requests, +// including GET, are sent to the leader. This means that, assuming +// the absence of leader failures, GET requests are guaranteed to see +// the changes made by previous requests. +// +// When consistency is set to WEAK_CONSISTENCY, other requests +// are still sent to the leader, but GET requests are sent to a +// random server from the server pool. This reduces the read +// load on the leader, but it's not guaranteed that the GET requests +// will see changes made by previous requests (they might have not +// yet been committed on non-leader servers). +func (c *Client) SetConsistency(consistency string) error { + if !(consistency == STRONG_CONSISTENCY || consistency == WEAK_CONSISTENCY) { + return errors.New("The argument must be either STRONG_CONSISTENCY or WEAK_CONSISTENCY.") + } + c.config.Consistency = consistency + return nil +} + +// Sets the DialTimeout value +func (c *Client) SetDialTimeout(d time.Duration) { + c.config.DialTimeout = d +} + +// AddRootCA adds a root CA cert for the etcd client +func (c *Client) AddRootCA(caCert string) error { + if c.httpClient == nil { + return errors.New("Client has not been initialized yet!") + } + + certBytes, err := ioutil.ReadFile(caCert) + if err != nil { + return err + } + + tr, ok := c.httpClient.Transport.(*http.Transport) + + if !ok { + panic("AddRootCA(): Transport type assert should not fail") + } + + if tr.TLSClientConfig.RootCAs == nil { + caCertPool := x509.NewCertPool() + ok = caCertPool.AppendCertsFromPEM(certBytes) + if ok { + tr.TLSClientConfig.RootCAs = caCertPool + } + tr.TLSClientConfig.InsecureSkipVerify = false + } else { + ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes) + } + + if !ok { + err = errors.New("Unable to load caCert") + } + + c.config.CaCertFile = append(c.config.CaCertFile, caCert) + c.saveConfig() + + return err +} + +// SetCluster updates cluster information using the given machine list. +func (c *Client) SetCluster(machines []string) bool { + success := c.internalSyncCluster(machines) + return success +} + +func (c *Client) GetCluster() []string { + return c.cluster.Machines +} + +// SyncCluster updates the cluster information using the internal machine list. +func (c *Client) SyncCluster() bool { + return c.internalSyncCluster(c.cluster.Machines) +} + +// internalSyncCluster syncs cluster information using the given machine list. +func (c *Client) internalSyncCluster(machines []string) bool { + for _, machine := range machines { + httpPath := c.createHttpPath(machine, path.Join(version, "members")) + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } + + if resp.StatusCode != http.StatusOK { // fall-back to old endpoint + httpPath := c.createHttpPath(machine, path.Join(version, "machines")) + resp, err := c.httpClient.Get(httpPath) + if err != nil { + // try another machine in the cluster + continue + } + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } + // update Machines List + c.cluster.updateFromStr(string(b)) + } else { + b, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // try another machine in the cluster + continue + } + + var mCollection memberCollection + if err := json.Unmarshal(b, &mCollection); err != nil { + // try another machine + continue + } + + urls := make([]string, 0) + for _, m := range mCollection { + urls = append(urls, m.ClientURLs...) + } + + // update Machines List + c.cluster.updateFromStr(strings.Join(urls, ",")) + } + + logger.Debug("sync.machines ", c.cluster.Machines) + c.saveConfig() + return true + } + + return false +} + +// createHttpPath creates a complete HTTP URL. +// serverName should contain both the host name and a port number, if any. +func (c *Client) createHttpPath(serverName string, _path string) string { + u, err := url.Parse(serverName) + if err != nil { + panic(err) + } + + u.Path = path.Join(u.Path, _path) + + if u.Scheme == "" { + u.Scheme = "http" + } + return u.String() +} + +// dial attempts to open a TCP connection to the provided address, explicitly +// enabling keep-alives with a one-second interval. +func (c *Client) dial(network, addr string) (net.Conn, error) { + conn, err := net.DialTimeout(network, addr, c.config.DialTimeout) + if err != nil { + return nil, err + } + + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return nil, errors.New("Failed type-assertion of net.Conn as *net.TCPConn") + } + + // Keep TCP alive to check whether or not the remote machine is down + if err = tcpConn.SetKeepAlive(true); err != nil { + return nil, err + } + + if err = tcpConn.SetKeepAlivePeriod(time.Second); err != nil { + return nil, err + } + + return tcpConn, nil +} + +func (c *Client) OpenCURL() { + c.cURLch = make(chan string, defaultBufferSize) +} + +func (c *Client) CloseCURL() { + c.cURLch = nil +} + +func (c *Client) sendCURL(command string) { + go func() { + select { + case c.cURLch <- command: + default: + } + }() +} + +func (c *Client) RecvCURL() string { + return <-c.cURLch +} + +// saveConfig saves the current config using c.persistence. +func (c *Client) saveConfig() error { + if c.persistence != nil { + b, err := json.Marshal(c) + if err != nil { + return err + } + + _, err = c.persistence.Write(b) + if err != nil { + return err + } + } + + return nil +} + +// MarshalJSON implements the Marshaller interface +// as defined by the standard JSON package. +func (c *Client) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(struct { + Config Config `json:"config"` + Cluster *Cluster `json:"cluster"` + }{ + Config: c.config, + Cluster: c.cluster, + }) + + if err != nil { + return nil, err + } + + return b, nil +} + +// UnmarshalJSON implements the Unmarshaller interface +// as defined by the standard JSON package. +func (c *Client) UnmarshalJSON(b []byte) error { + temp := struct { + Config Config `json:"config"` + Cluster *Cluster `json:"cluster"` + }{} + err := json.Unmarshal(b, &temp) + if err != nil { + return err + } + + c.cluster = temp.Cluster + c.config = temp.Config + return nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go new file mode 100644 index 0000000000..4720d8d693 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/client_test.go @@ -0,0 +1,108 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "net" + "net/url" + "os" + "testing" +) + +// To pass this test, we need to create a cluster of 3 machines +// The server should be listening on localhost:4001, 4002, 4003 +func TestSync(t *testing.T) { + fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003") + + // Explicit trailing slash to ensure this doesn't reproduce: + // https://github.com/coreos/go-etcd/issues/82 + c := NewClient([]string{"http://127.0.0.1:4001/"}) + + success := c.SyncCluster() + if !success { + t.Fatal("cannot sync machines") + } + + for _, m := range c.GetCluster() { + u, err := url.Parse(m) + if err != nil { + t.Fatal(err) + } + if u.Scheme != "http" { + t.Fatal("scheme must be http") + } + + host, _, err := net.SplitHostPort(u.Host) + if err != nil { + t.Fatal(err) + } + if host != "localhost" { + t.Fatal("Host must be localhost") + } + } + + badMachines := []string{"abc", "edef"} + + success = c.SetCluster(badMachines) + + if success { + t.Fatal("should not sync on bad machines") + } + + goodMachines := []string{"127.0.0.1:4002"} + + success = c.SetCluster(goodMachines) + + if !success { + t.Fatal("cannot sync machines") + } else { + fmt.Println(c.cluster.Machines) + } + +} + +func TestPersistence(t *testing.T) { + c := NewClient(nil) + c.SyncCluster() + + fo, err := os.Create("config.json") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := fo.Close(); err != nil { + panic(err) + } + }() + + c.SetPersistence(fo) + err = c.saveConfig() + if err != nil { + t.Fatal(err) + } + + c2, err := NewClientFromFile("config.json") + if err != nil { + t.Fatal(err) + } + + // Verify that the two clients have the same config + b1, _ := json.Marshal(c) + b2, _ := json.Marshal(c2) + + if string(b1) != string(b2) { + t.Fatalf("The two configs should be equal!") + } +} + +func TestClientRetry(t *testing.T) { + c := NewClient([]string{"http://strange", "http://127.0.0.1:4001"}) + // use first endpoint as the picked url + c.cluster.picked = 0 + if _, err := c.Set("foo", "bar", 5); err != nil { + t.Fatal(err) + } + if _, err := c.Delete("foo", true); err != nil { + t.Fatal(err) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go new file mode 100644 index 0000000000..1ad3e155be --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/cluster.go @@ -0,0 +1,37 @@ +package etcd + +import ( + "math/rand" + "strings" +) + +type Cluster struct { + Leader string `json:"leader"` + Machines []string `json:"machines"` + picked int +} + +func NewCluster(machines []string) *Cluster { + // if an empty slice was sent in then just assume HTTP 4001 on localhost + if len(machines) == 0 { + machines = []string{"http://127.0.0.1:4001"} + } + + // default leader and machines + return &Cluster{ + Leader: "", + Machines: machines, + picked: rand.Intn(len(machines)), + } +} + +func (cl *Cluster) failure() { cl.picked = rand.Intn(len(cl.Machines)) } +func (cl *Cluster) pick() string { return cl.Machines[cl.picked] } + +func (cl *Cluster) updateFromStr(machines string) { + cl.Machines = strings.Split(machines, ",") + for i := range cl.Machines { + cl.Machines[i] = strings.TrimSpace(cl.Machines[i]) + } + cl.picked = rand.Intn(len(cl.Machines)) +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete.go new file mode 100644 index 0000000000..11131bb760 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete.go @@ -0,0 +1,34 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndDelete(key string, prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndDelete(key, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +func (c *Client) RawCompareAndDelete(key string, prevValue string, prevIndex uint64) (*RawResponse, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := Options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + + raw, err := c.delete(key, options) + + if err != nil { + return nil, err + } + + return raw, err +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go new file mode 100644 index 0000000000..223e50f291 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_delete_test.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + + // This should succeed an correct prevValue + resp, err := c.CompareAndDelete("foo", "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndDelete 1 prevNode failed: %#v", resp) + } + + resp, _ = c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevValue + _, err = c.CompareAndDelete("foo", "xxx", 0) + if err == nil { + t.Fatalf("CompareAndDelete 2 should have failed. The response is: %#v", resp) + } + + // This should succeed because it gives an correct prevIndex + resp, err = c.CompareAndDelete("foo", "", resp.Node.ModifiedIndex) + if err != nil { + t.Fatal(err) + } + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp) + } + + c.Set("foo", "bar", 5) + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndDelete("foo", "", 29817514) + if err == nil { + t.Fatalf("CompareAndDelete 4 should have failed. The response is: %#v", resp) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap.go new file mode 100644 index 0000000000..bb4f90643a --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap.go @@ -0,0 +1,36 @@ +package etcd + +import "fmt" + +func (c *Client) CompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*Response, error) { + raw, err := c.RawCompareAndSwap(key, value, ttl, prevValue, prevIndex) + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +func (c *Client) RawCompareAndSwap(key string, value string, ttl uint64, + prevValue string, prevIndex uint64) (*RawResponse, error) { + if prevValue == "" && prevIndex == 0 { + return nil, fmt.Errorf("You must give either prevValue or prevIndex.") + } + + options := Options{} + if prevValue != "" { + options["prevValue"] = prevValue + } + if prevIndex != 0 { + options["prevIndex"] = prevIndex + } + + raw, err := c.put(key, value, ttl, options) + + if err != nil { + return nil, err + } + + return raw, err +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go new file mode 100644 index 0000000000..14a1b00f5a --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/compare_and_swap_test.go @@ -0,0 +1,57 @@ +package etcd + +import ( + "testing" +) + +func TestCompareAndSwap(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + + // This should succeed + resp, err := c.CompareAndSwap("foo", "bar2", 5, "bar", 0) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { + t.Fatalf("CompareAndSwap 1 failed: %#v", resp) + } + + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 1 prevNode failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevValue + resp, err = c.CompareAndSwap("foo", "bar3", 5, "xxx", 0) + if err == nil { + t.Fatalf("CompareAndSwap 2 should have failed. The response is: %#v", resp) + } + + resp, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + resp, err = c.CompareAndSwap("foo", "bar2", 5, "", resp.Node.ModifiedIndex) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Value == "bar2" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { + t.Fatalf("CompareAndSwap 3 failed: %#v", resp) + } + + if !(resp.PrevNode.Value == "bar" && resp.PrevNode.Key == "/foo" && resp.PrevNode.TTL == 5) { + t.Fatalf("CompareAndSwap 3 prevNode failed: %#v", resp) + } + + // This should fail because it gives an incorrect prevIndex + resp, err = c.CompareAndSwap("foo", "bar3", 5, "", 29817514) + if err == nil { + t.Fatalf("CompareAndSwap 4 should have failed. The response is: %#v", resp) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug.go new file mode 100644 index 0000000000..0f777886ba --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug.go @@ -0,0 +1,55 @@ +package etcd + +import ( + "fmt" + "io/ioutil" + "log" + "strings" +) + +var logger *etcdLogger + +func SetLogger(l *log.Logger) { + logger = &etcdLogger{l} +} + +func GetLogger() *log.Logger { + return logger.log +} + +type etcdLogger struct { + log *log.Logger +} + +func (p *etcdLogger) Debug(args ...interface{}) { + msg := "DEBUG: " + fmt.Sprint(args...) + p.log.Println(msg) +} + +func (p *etcdLogger) Debugf(f string, args ...interface{}) { + msg := "DEBUG: " + fmt.Sprintf(f, args...) + // Append newline if necessary + if !strings.HasSuffix(msg, "\n") { + msg = msg + "\n" + } + p.log.Print(msg) +} + +func (p *etcdLogger) Warning(args ...interface{}) { + msg := "WARNING: " + fmt.Sprint(args...) + p.log.Println(msg) +} + +func (p *etcdLogger) Warningf(f string, args ...interface{}) { + msg := "WARNING: " + fmt.Sprintf(f, args...) + // Append newline if necessary + if !strings.HasSuffix(msg, "\n") { + msg = msg + "\n" + } + p.log.Print(msg) +} + +func init() { + // Default logger uses the go default log. + SetLogger(log.New(ioutil.Discard, "go-etcd", log.LstdFlags)) +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug_test.go new file mode 100644 index 0000000000..97f6d1110b --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/debug_test.go @@ -0,0 +1,28 @@ +package etcd + +import ( + "testing" +) + +type Foo struct{} +type Bar struct { + one string + two int +} + +// Tests that logs don't panic with arbitrary interfaces +func TestDebug(t *testing.T) { + f := &Foo{} + b := &Bar{"asfd", 3} + for _, test := range []interface{}{ + 1234, + "asdf", + f, + b, + } { + logger.Debug(test) + logger.Debugf("something, %s", test) + logger.Warning(test) + logger.Warningf("something, %s", test) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete.go new file mode 100644 index 0000000000..b37accd7db --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete.go @@ -0,0 +1,40 @@ +package etcd + +// Delete deletes the given key. +// +// When recursive set to false, if the key points to a +// directory the method will fail. +// +// When recursive set to true, if the key points to a file, +// the file will be deleted; if the key points to a directory, +// then everything under the directory (including all child directories) +// will be deleted. +func (c *Client) Delete(key string, recursive bool) (*Response, error) { + raw, err := c.RawDelete(key, recursive, false) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// DeleteDir deletes an empty directory or a key value pair +func (c *Client) DeleteDir(key string) (*Response, error) { + raw, err := c.RawDelete(key, false, true) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +func (c *Client) RawDelete(key string, recursive bool, dir bool) (*RawResponse, error) { + ops := Options{ + "recursive": recursive, + "dir": dir, + } + + return c.delete(key, ops) +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete_test.go new file mode 100644 index 0000000000..5904971556 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/delete_test.go @@ -0,0 +1,81 @@ +package etcd + +import ( + "testing" +) + +func TestDelete(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + resp, err := c.Delete("foo", false) + if err != nil { + t.Fatal(err) + } + + if !(resp.Node.Value == "") { + t.Fatalf("Delete failed with %s", resp.Node.Value) + } + + if !(resp.PrevNode.Value == "bar") { + t.Fatalf("Delete PrevNode failed with %s", resp.Node.Value) + } + + resp, err = c.Delete("foo", false) + if err == nil { + t.Fatalf("Delete should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} + +func TestDeleteAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + c.Delete("fooDir", true) + }() + + c.SetDir("foo", 5) + // test delete an empty dir + resp, err := c.DeleteDir("foo") + if err != nil { + t.Fatal(err) + } + + if !(resp.Node.Value == "") { + t.Fatalf("DeleteAll 1 failed: %#v", resp) + } + + if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") { + t.Fatalf("DeleteAll 1 PrevNode failed: %#v", resp) + } + + c.CreateDir("fooDir", 5) + c.Set("fooDir/foo", "bar", 5) + _, err = c.DeleteDir("fooDir") + if err == nil { + t.Fatal("should not able to delete a non-empty dir with deletedir") + } + + resp, err = c.Delete("fooDir", true) + if err != nil { + t.Fatal(err) + } + + if !(resp.Node.Value == "") { + t.Fatalf("DeleteAll 2 failed: %#v", resp) + } + + if !(resp.PrevNode.Dir == true && resp.PrevNode.Value == "") { + t.Fatalf("DeleteAll 2 PrevNode failed: %#v", resp) + } + + resp, err = c.Delete("foo", true) + if err == nil { + t.Fatalf("DeleteAll should have failed because the key foo did not exist. "+ + "The response was: %v", resp) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go new file mode 100644 index 0000000000..66dca54b5c --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/error.go @@ -0,0 +1,49 @@ +package etcd + +import ( + "encoding/json" + "fmt" +) + +const ( + ErrCodeEtcdNotReachable = 501 + ErrCodeUnhandledHTTPStatus = 502 +) + +var ( + errorMap = map[int]string{ + ErrCodeEtcdNotReachable: "All the given peers are not reachable", + } +) + +type EtcdError struct { + ErrorCode int `json:"errorCode"` + Message string `json:"message"` + Cause string `json:"cause,omitempty"` + Index uint64 `json:"index"` +} + +func (e EtcdError) Error() string { + return fmt.Sprintf("%v: %v (%v) [%v]", e.ErrorCode, e.Message, e.Cause, e.Index) +} + +func newError(errorCode int, cause string, index uint64) *EtcdError { + return &EtcdError{ + ErrorCode: errorCode, + Message: errorMap[errorCode], + Cause: cause, + Index: index, + } +} + +func handleError(b []byte) error { + etcdErr := new(EtcdError) + + err := json.Unmarshal(b, etcdErr) + if err != nil { + logger.Warningf("cannot unmarshal etcd error: %v", err) + return err + } + + return etcdErr +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go new file mode 100644 index 0000000000..09fe641c25 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get.go @@ -0,0 +1,32 @@ +package etcd + +// Get gets the file or directory associated with the given key. +// If the key points to a directory, files and directories under +// it will be returned in sorted or unsorted order, depending on +// the sort flag. +// If recursive is set to false, contents under child directories +// will not be returned. +// If recursive is set to true, all the contents will be returned. +func (c *Client) Get(key string, sort, recursive bool) (*Response, error) { + raw, err := c.RawGet(key, sort, recursive) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +func (c *Client) RawGet(key string, sort, recursive bool) (*RawResponse, error) { + var q bool + if c.config.Consistency == STRONG_CONSISTENCY { + q = true + } + ops := Options{ + "recursive": recursive, + "sorted": sort, + "quorum": q, + } + + return c.get(key, ops) +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get_test.go new file mode 100644 index 0000000000..279c4e26f8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/get_test.go @@ -0,0 +1,131 @@ +package etcd + +import ( + "reflect" + "testing" +) + +// cleanNode scrubs Expiration, ModifiedIndex and CreatedIndex of a node. +func cleanNode(n *Node) { + n.Expiration = nil + n.ModifiedIndex = 0 + n.CreatedIndex = 0 +} + +// cleanResult scrubs a result object two levels deep of Expiration, +// ModifiedIndex and CreatedIndex. +func cleanResult(result *Response) { + // TODO(philips): make this recursive. + cleanNode(result.Node) + for i, _ := range result.Node.Nodes { + cleanNode(result.Node.Nodes[i]) + for j, _ := range result.Node.Nodes[i].Nodes { + cleanNode(result.Node.Nodes[i].Nodes[j]) + } + } +} + +func TestGet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + c.Set("foo", "bar", 5) + + result, err := c.Get("foo", false, false) + + if err != nil { + t.Fatal(err) + } + + if result.Node.Key != "/foo" || result.Node.Value != "bar" { + t.Fatalf("Get failed with %s %s %v", result.Node.Key, result.Node.Value, result.Node.TTL) + } + + result, err = c.Get("goo", false, false) + if err == nil { + t.Fatalf("should not be able to get non-exist key") + } +} + +func TestGetAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("fooDir", true) + }() + + c.CreateDir("fooDir", 5) + c.Set("fooDir/k0", "v0", 5) + c.Set("fooDir/k1", "v1", 5) + + // Return kv-pairs in sorted order + result, err := c.Get("fooDir", true, false) + + if err != nil { + t.Fatal(err) + } + + expected := Nodes{ + &Node{ + Key: "/fooDir/k0", + Value: "v0", + TTL: 5, + }, + &Node{ + Key: "/fooDir/k1", + Value: "v1", + TTL: 5, + }, + } + + cleanResult(result) + + if !reflect.DeepEqual(result.Node.Nodes, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) + } + + // Test the `recursive` option + c.CreateDir("fooDir/childDir", 5) + c.Set("fooDir/childDir/k2", "v2", 5) + + // Return kv-pairs in sorted order + result, err = c.Get("fooDir", true, true) + + cleanResult(result) + + if err != nil { + t.Fatal(err) + } + + expected = Nodes{ + &Node{ + Key: "/fooDir/childDir", + Dir: true, + Nodes: Nodes{ + &Node{ + Key: "/fooDir/childDir/k2", + Value: "v2", + TTL: 5, + }, + }, + TTL: 5, + }, + &Node{ + Key: "/fooDir/k0", + Value: "v0", + TTL: 5, + }, + &Node{ + Key: "/fooDir/k1", + Value: "v1", + TTL: 5, + }, + } + + cleanResult(result) + + if !reflect.DeepEqual(result.Node.Nodes, expected) { + t.Fatalf("(actual) %v != (expected) %v", result.Node.Nodes, expected) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go new file mode 100644 index 0000000000..5b13b28e1a --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member.go @@ -0,0 +1,30 @@ +package etcd + +import "encoding/json" + +type Member struct { + ID string `json:"id"` + Name string `json:"name"` + PeerURLs []string `json:"peerURLs"` + ClientURLs []string `json:"clientURLs"` +} + +type memberCollection []Member + +func (c *memberCollection) UnmarshalJSON(data []byte) error { + d := struct { + Members []Member + }{} + + if err := json.Unmarshal(data, &d); err != nil { + return err + } + + if d.Members == nil { + *c = make([]Member, 0) + return nil + } + + *c = d.Members + return nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go new file mode 100644 index 0000000000..53ebdd4bfd --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/member_test.go @@ -0,0 +1,71 @@ +package etcd + +import ( + "encoding/json" + "reflect" + "testing" +) + +func TestMemberCollectionUnmarshal(t *testing.T) { + tests := []struct { + body []byte + want memberCollection + }{ + { + body: []byte(`{"members":[]}`), + want: memberCollection([]Member{}), + }, + { + body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`), + want: memberCollection( + []Member{ + { + ID: "2745e2525fce8fe", + Name: "node3", + PeerURLs: []string{ + "http://127.0.0.1:7003", + }, + ClientURLs: []string{ + "http://127.0.0.1:4003", + }, + }, + { + ID: "42134f434382925", + Name: "node1", + PeerURLs: []string{ + "http://127.0.0.1:2380", + "http://127.0.0.1:7001", + }, + ClientURLs: []string{ + "http://127.0.0.1:2379", + "http://127.0.0.1:4001", + }, + }, + { + ID: "94088180e21eb87b", + Name: "node2", + PeerURLs: []string{ + "http://127.0.0.1:7002", + }, + ClientURLs: []string{ + "http://127.0.0.1:4002", + }, + }, + }, + ), + }, + } + + for i, tt := range tests { + var got memberCollection + err := json.Unmarshal(tt.body, &got) + if err != nil { + t.Errorf("#%d: unexpected error: %v", i, err) + continue + } + + if !reflect.DeepEqual(tt.want, got) { + t.Errorf("#%d: incorrect output: want=%#v, got=%#v", i, tt.want, got) + } + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go new file mode 100644 index 0000000000..d21c96f080 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/options.go @@ -0,0 +1,72 @@ +package etcd + +import ( + "fmt" + "net/url" + "reflect" +) + +type Options map[string]interface{} + +// An internally-used data structure that represents a mapping +// between valid options and their kinds +type validOptions map[string]reflect.Kind + +// Valid options for GET, PUT, POST, DELETE +// Using CAPITALIZED_UNDERSCORE to emphasize that these +// values are meant to be used as constants. +var ( + VALID_GET_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "quorum": reflect.Bool, + "sorted": reflect.Bool, + "wait": reflect.Bool, + "waitIndex": reflect.Uint64, + } + + VALID_PUT_OPTIONS = validOptions{ + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + "prevExist": reflect.Bool, + "dir": reflect.Bool, + } + + VALID_POST_OPTIONS = validOptions{} + + VALID_DELETE_OPTIONS = validOptions{ + "recursive": reflect.Bool, + "dir": reflect.Bool, + "prevValue": reflect.String, + "prevIndex": reflect.Uint64, + } +) + +// Convert options to a string of HTML parameters +func (ops Options) toParameters(validOps validOptions) (string, error) { + p := "?" + values := url.Values{} + + if ops == nil { + return "", nil + } + + for k, v := range ops { + // Check if the given option is valid (that it exists) + kind := validOps[k] + if kind == reflect.Invalid { + return "", fmt.Errorf("Invalid option: %v", k) + } + + // Check if the given option is of the valid type + t := reflect.TypeOf(v) + if kind != t.Kind() { + return "", fmt.Errorf("Option %s should be of %v kind, not of %v kind.", + k, kind, t.Kind()) + } + + values.Set(k, fmt.Sprintf("%v", v)) + } + + p += values.Encode() + return p, nil +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go new file mode 100644 index 0000000000..3c3f436bea --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests.go @@ -0,0 +1,403 @@ +package etcd + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" + "sync" + "time" +) + +// Errors introduced by handling requests +var ( + ErrRequestCancelled = errors.New("sending request is cancelled") +) + +type RawRequest struct { + Method string + RelativePath string + Values url.Values + Cancel <-chan bool +} + +// NewRawRequest returns a new RawRequest +func NewRawRequest(method, relativePath string, values url.Values, cancel <-chan bool) *RawRequest { + return &RawRequest{ + Method: method, + RelativePath: relativePath, + Values: values, + Cancel: cancel, + } +} + +// getCancelable issues a cancelable GET request +func (c *Client) getCancelable(key string, options Options, + cancel <-chan bool) (*RawResponse, error) { + logger.Debugf("get %s [%s]", key, c.cluster.pick()) + p := keyToPath(key) + + str, err := options.toParameters(VALID_GET_OPTIONS) + if err != nil { + return nil, err + } + p += str + + req := NewRawRequest("GET", p, nil, cancel) + resp, err := c.SendRequest(req) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// get issues a GET request +func (c *Client) get(key string, options Options) (*RawResponse, error) { + return c.getCancelable(key, options, nil) +} + +// put issues a PUT request +func (c *Client) put(key string, value string, ttl uint64, + options Options) (*RawResponse, error) { + + logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick()) + p := keyToPath(key) + + str, err := options.toParameters(VALID_PUT_OPTIONS) + if err != nil { + return nil, err + } + p += str + + req := NewRawRequest("PUT", p, buildValues(value, ttl), nil) + resp, err := c.SendRequest(req) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// post issues a POST request +func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) { + logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.pick()) + p := keyToPath(key) + + req := NewRawRequest("POST", p, buildValues(value, ttl), nil) + resp, err := c.SendRequest(req) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// delete issues a DELETE request +func (c *Client) delete(key string, options Options) (*RawResponse, error) { + logger.Debugf("delete %s [%s]", key, c.cluster.pick()) + p := keyToPath(key) + + str, err := options.toParameters(VALID_DELETE_OPTIONS) + if err != nil { + return nil, err + } + p += str + + req := NewRawRequest("DELETE", p, nil, nil) + resp, err := c.SendRequest(req) + + if err != nil { + return nil, err + } + + return resp, nil +} + +// SendRequest sends a HTTP request and returns a Response as defined by etcd +func (c *Client) SendRequest(rr *RawRequest) (*RawResponse, error) { + var req *http.Request + var resp *http.Response + var httpPath string + var err error + var respBody []byte + + var numReqs = 1 + + checkRetry := c.CheckRetry + if checkRetry == nil { + checkRetry = DefaultCheckRetry + } + + cancelled := make(chan bool, 1) + reqLock := new(sync.Mutex) + + if rr.Cancel != nil { + cancelRoutine := make(chan bool) + defer close(cancelRoutine) + + go func() { + select { + case <-rr.Cancel: + cancelled <- true + logger.Debug("send.request is cancelled") + case <-cancelRoutine: + return + } + + // Repeat canceling request until this thread is stopped + // because we have no idea about whether it succeeds. + for { + reqLock.Lock() + c.httpClient.Transport.(*http.Transport).CancelRequest(req) + reqLock.Unlock() + + select { + case <-time.After(100 * time.Millisecond): + case <-cancelRoutine: + return + } + } + }() + } + + // If we connect to a follower and consistency is required, retry until + // we connect to a leader + sleep := 25 * time.Millisecond + maxSleep := time.Second + + for attempt := 0; ; attempt++ { + if attempt > 0 { + select { + case <-cancelled: + return nil, ErrRequestCancelled + case <-time.After(sleep): + sleep = sleep * 2 + if sleep > maxSleep { + sleep = maxSleep + } + } + } + + logger.Debug("Connecting to etcd: attempt ", attempt+1, " for ", rr.RelativePath) + + // get httpPath if not set + if httpPath == "" { + httpPath = c.getHttpPath(rr.RelativePath) + } + + // Return a cURL command if curlChan is set + if c.cURLch != nil { + command := fmt.Sprintf("curl -X %s %s", rr.Method, httpPath) + for key, value := range rr.Values { + command += fmt.Sprintf(" -d %s=%s", key, value[0]) + } + if c.credentials != nil { + command += fmt.Sprintf(" -u %s", c.credentials.username) + } + c.sendCURL(command) + } + + logger.Debug("send.request.to ", httpPath, " | method ", rr.Method) + + req, err := func() (*http.Request, error) { + reqLock.Lock() + defer reqLock.Unlock() + + if rr.Values == nil { + if req, err = http.NewRequest(rr.Method, httpPath, nil); err != nil { + return nil, err + } + } else { + body := strings.NewReader(rr.Values.Encode()) + if req, err = http.NewRequest(rr.Method, httpPath, body); err != nil { + return nil, err + } + + req.Header.Set("Content-Type", + "application/x-www-form-urlencoded; param=value") + } + return req, nil + }() + + if err != nil { + return nil, err + } + + if c.credentials != nil { + req.SetBasicAuth(c.credentials.username, c.credentials.password) + } + + resp, err = c.httpClient.Do(req) + // clear previous httpPath + httpPath = "" + defer func() { + if resp != nil { + resp.Body.Close() + } + }() + + // If the request was cancelled, return ErrRequestCancelled directly + select { + case <-cancelled: + return nil, ErrRequestCancelled + default: + } + + numReqs++ + + // network error, change a machine! + if err != nil { + logger.Debug("network error: ", err.Error()) + lastResp := http.Response{} + if checkErr := checkRetry(c.cluster, numReqs, lastResp, err); checkErr != nil { + return nil, checkErr + } + + c.cluster.failure() + continue + } + + // if there is no error, it should receive response + logger.Debug("recv.response.from ", httpPath) + + if validHttpStatusCode[resp.StatusCode] { + // try to read byte code and break the loop + respBody, err = ioutil.ReadAll(resp.Body) + if err == nil { + logger.Debug("recv.success ", httpPath) + break + } + // ReadAll error may be caused due to cancel request + select { + case <-cancelled: + return nil, ErrRequestCancelled + default: + } + + if err == io.ErrUnexpectedEOF { + // underlying connection was closed prematurely, probably by timeout + // TODO: empty body or unexpectedEOF can cause http.Transport to get hosed; + // this allows the client to detect that and take evasive action. Need + // to revisit once code.google.com/p/go/issues/detail?id=8648 gets fixed. + respBody = []byte{} + break + } + } + + if resp.StatusCode == http.StatusTemporaryRedirect { + u, err := resp.Location() + + if err != nil { + logger.Warning(err) + } else { + // set httpPath for following redirection + httpPath = u.String() + } + resp.Body.Close() + continue + } + + if checkErr := checkRetry(c.cluster, numReqs, *resp, + errors.New("Unexpected HTTP status code")); checkErr != nil { + return nil, checkErr + } + resp.Body.Close() + } + + r := &RawResponse{ + StatusCode: resp.StatusCode, + Body: respBody, + Header: resp.Header, + } + + return r, nil +} + +// DefaultCheckRetry defines the retrying behaviour for bad HTTP requests +// If we have retried 2 * machine number, stop retrying. +// If status code is InternalServerError, sleep for 200ms. +func DefaultCheckRetry(cluster *Cluster, numReqs int, lastResp http.Response, + err error) error { + + if numReqs > 2*len(cluster.Machines) { + errStr := fmt.Sprintf("failed to propose on members %v twice [last error: %v]", cluster.Machines, err) + return newError(ErrCodeEtcdNotReachable, errStr, 0) + } + + if isEmptyResponse(lastResp) { + // always retry if it failed to get response from one machine + return nil + } + if !shouldRetry(lastResp) { + body := []byte("nil") + if lastResp.Body != nil { + if b, err := ioutil.ReadAll(lastResp.Body); err == nil { + body = b + } + } + errStr := fmt.Sprintf("unhandled http status [%s] with body [%s]", http.StatusText(lastResp.StatusCode), body) + return newError(ErrCodeUnhandledHTTPStatus, errStr, 0) + } + // sleep some time and expect leader election finish + time.Sleep(time.Millisecond * 200) + logger.Warning("bad response status code", lastResp.StatusCode) + return nil +} + +func isEmptyResponse(r http.Response) bool { return r.StatusCode == 0 } + +// shouldRetry returns whether the reponse deserves retry. +func shouldRetry(r http.Response) bool { + // TODO: only retry when the cluster is in leader election + // We cannot do it exactly because etcd doesn't support it well. + return r.StatusCode == http.StatusInternalServerError +} + +func (c *Client) getHttpPath(s ...string) string { + fullPath := c.cluster.pick() + "/" + version + for _, seg := range s { + fullPath = fullPath + "/" + seg + } + return fullPath +} + +// buildValues builds a url.Values map according to the given value and ttl +func buildValues(value string, ttl uint64) url.Values { + v := url.Values{} + + if value != "" { + v.Set("value", value) + } + + if ttl > 0 { + v.Set("ttl", fmt.Sprintf("%v", ttl)) + } + + return v +} + +// convert key string to http path exclude version, including URL escaping +// for example: key[foo] -> path[keys/foo] +// key[/%z] -> path[keys/%25z] +// key[/] -> path[keys/] +func keyToPath(key string) string { + // URL-escape our key, except for slashes + p := strings.Replace(url.QueryEscape(path.Join("keys", key)), "%2F", "/", -1) + + // corner case: if key is "/" or "//" ect + // path join will clear the tailing "/" + // we need to add it back + if p == "keys" { + p = "keys/" + } + + return p +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests_test.go new file mode 100644 index 0000000000..7a2bd190a1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/requests_test.go @@ -0,0 +1,22 @@ +package etcd + +import "testing" + +func TestKeyToPath(t *testing.T) { + tests := []struct { + key string + wpath string + }{ + {"", "keys/"}, + {"foo", "keys/foo"}, + {"foo/bar", "keys/foo/bar"}, + {"%z", "keys/%25z"}, + {"/", "keys/"}, + } + for i, tt := range tests { + path := keyToPath(tt.key) + if path != tt.wpath { + t.Errorf("#%d: path = %s, want %s", i, path, tt.wpath) + } + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/response.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/response.go new file mode 100644 index 0000000000..1fe9b4e871 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/response.go @@ -0,0 +1,89 @@ +package etcd + +import ( + "encoding/json" + "net/http" + "strconv" + "time" +) + +const ( + rawResponse = iota + normalResponse +) + +type responseType int + +type RawResponse struct { + StatusCode int + Body []byte + Header http.Header +} + +var ( + validHttpStatusCode = map[int]bool{ + http.StatusCreated: true, + http.StatusOK: true, + http.StatusBadRequest: true, + http.StatusNotFound: true, + http.StatusPreconditionFailed: true, + http.StatusForbidden: true, + } +) + +// Unmarshal parses RawResponse and stores the result in Response +func (rr *RawResponse) Unmarshal() (*Response, error) { + if rr.StatusCode != http.StatusOK && rr.StatusCode != http.StatusCreated { + return nil, handleError(rr.Body) + } + + resp := new(Response) + + err := json.Unmarshal(rr.Body, resp) + + if err != nil { + return nil, err + } + + // attach index and term to response + resp.EtcdIndex, _ = strconv.ParseUint(rr.Header.Get("X-Etcd-Index"), 10, 64) + resp.RaftIndex, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Index"), 10, 64) + resp.RaftTerm, _ = strconv.ParseUint(rr.Header.Get("X-Raft-Term"), 10, 64) + + return resp, nil +} + +type Response struct { + Action string `json:"action"` + Node *Node `json:"node"` + PrevNode *Node `json:"prevNode,omitempty"` + EtcdIndex uint64 `json:"etcdIndex"` + RaftIndex uint64 `json:"raftIndex"` + RaftTerm uint64 `json:"raftTerm"` +} + +type Node struct { + Key string `json:"key, omitempty"` + Value string `json:"value,omitempty"` + Dir bool `json:"dir,omitempty"` + Expiration *time.Time `json:"expiration,omitempty"` + TTL int64 `json:"ttl,omitempty"` + Nodes Nodes `json:"nodes,omitempty"` + ModifiedIndex uint64 `json:"modifiedIndex,omitempty"` + CreatedIndex uint64 `json:"createdIndex,omitempty"` +} + +type Nodes []*Node + +// interfaces for sorting +func (ns Nodes) Len() int { + return len(ns) +} + +func (ns Nodes) Less(i, j int) bool { + return ns[i].Key < ns[j].Key +} + +func (ns Nodes) Swap(i, j int) { + ns[i], ns[j] = ns[j], ns[i] +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go new file mode 100644 index 0000000000..87c86b8308 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_curl_chan_test.go @@ -0,0 +1,42 @@ +package etcd + +import ( + "fmt" + "testing" +) + +func TestSetCurlChan(t *testing.T) { + c := NewClient(nil) + c.OpenCURL() + + defer func() { + c.Delete("foo", true) + }() + + _, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + expected := fmt.Sprintf("curl -X PUT %s/v2/keys/foo -d value=bar -d ttl=5", + c.cluster.pick()) + actual := c.RecvCURL() + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } + + c.SetConsistency(STRONG_CONSISTENCY) + _, err = c.Get("foo", false, false) + if err != nil { + t.Fatal(err) + } + + expected = fmt.Sprintf("curl -X GET %s/v2/keys/foo?quorum=true&recursive=false&sorted=false", + c.cluster.pick()) + actual = c.RecvCURL() + if expected != actual { + t.Fatalf(`Command "%s" is not equal to expected value "%s"`, + actual, expected) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create.go new file mode 100644 index 0000000000..e2840cf356 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create.go @@ -0,0 +1,137 @@ +package etcd + +// Set sets the given key to the given value. +// It will create a new key value pair or replace the old one. +// It will not replace a existing directory. +func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawSet(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// SetDir sets the given key to a directory. +// It will create a new directory or replace the old key value pair by a directory. +// It will not replace a existing directory. +func (c *Client) SetDir(key string, ttl uint64) (*Response, error) { + raw, err := c.RawSetDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// CreateDir creates a directory. It succeeds only if +// the given key does not yet exist. +func (c *Client) CreateDir(key string, ttl uint64) (*Response, error) { + raw, err := c.RawCreateDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// UpdateDir updates the given directory. It succeeds only if the +// given key already exists. +func (c *Client) UpdateDir(key string, ttl uint64) (*Response, error) { + raw, err := c.RawUpdateDir(key, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// Create creates a file with the given value under the given key. It succeeds +// only if the given key does not yet exist. +func (c *Client) Create(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawCreate(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// CreateInOrder creates a file with a key that's guaranteed to be higher than other +// keys in the given directory. It is useful for creating queues. +func (c *Client) CreateInOrder(dir string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawCreateInOrder(dir, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +// Update updates the given key to the given value. It succeeds only if the +// given key already exists. +func (c *Client) Update(key string, value string, ttl uint64) (*Response, error) { + raw, err := c.RawUpdate(key, value, ttl) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() +} + +func (c *Client) RawUpdateDir(key string, ttl uint64) (*RawResponse, error) { + ops := Options{ + "prevExist": true, + "dir": true, + } + + return c.put(key, "", ttl, ops) +} + +func (c *Client) RawCreateDir(key string, ttl uint64) (*RawResponse, error) { + ops := Options{ + "prevExist": false, + "dir": true, + } + + return c.put(key, "", ttl, ops) +} + +func (c *Client) RawSet(key string, value string, ttl uint64) (*RawResponse, error) { + return c.put(key, value, ttl, nil) +} + +func (c *Client) RawSetDir(key string, ttl uint64) (*RawResponse, error) { + ops := Options{ + "dir": true, + } + + return c.put(key, "", ttl, ops) +} + +func (c *Client) RawUpdate(key string, value string, ttl uint64) (*RawResponse, error) { + ops := Options{ + "prevExist": true, + } + + return c.put(key, value, ttl, ops) +} + +func (c *Client) RawCreate(key string, value string, ttl uint64) (*RawResponse, error) { + ops := Options{ + "prevExist": false, + } + + return c.put(key, value, ttl, ops) +} + +func (c *Client) RawCreateInOrder(dir string, value string, ttl uint64) (*RawResponse, error) { + return c.post(dir, value, ttl) +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create_test.go new file mode 100644 index 0000000000..ced0f06e7b --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/set_update_create_test.go @@ -0,0 +1,241 @@ +package etcd + +import ( + "testing" +) + +func TestSet(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + }() + + resp, err := c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + if resp.Node.Key != "/foo" || resp.Node.Value != "bar" || resp.Node.TTL != 5 { + t.Fatalf("Set 1 failed: %#v", resp) + } + if resp.PrevNode != nil { + t.Fatalf("Set 1 PrevNode failed: %#v", resp) + } + + resp, err = c.Set("foo", "bar2", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/foo" && resp.Node.Value == "bar2" && resp.Node.TTL == 5) { + t.Fatalf("Set 2 failed: %#v", resp) + } + if resp.PrevNode.Key != "/foo" || resp.PrevNode.Value != "bar" || resp.Node.TTL != 5 { + t.Fatalf("Set 2 PrevNode failed: %#v", resp) + } +} + +func TestUpdate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + c.Delete("nonexistent", true) + }() + + resp, err := c.Set("foo", "bar", 5) + + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.Update("foo", "wakawaka", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Node.Key == "/foo" && resp.Node.TTL == 5) { + t.Fatalf("Update 1 failed: %#v", resp) + } + if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.Node.TTL == 5) { + t.Fatalf("Update 1 prevValue failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.Update("nonexistent", "whatever", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Node.Key, resp) + } +} + +func TestCreate(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("newKey", true) + }() + + newKey := "/newKey" + newValue := "/newValue" + + // This should succeed + resp, err := c.Create(newKey, newValue, 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Key == newKey && + resp.Node.Value == newValue && resp.Node.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + if resp.PrevNode != nil { + t.Fatalf("Create 1 PrevNode failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.Create(newKey, newValue, 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Node.Key, resp) + } +} + +func TestCreateInOrder(t *testing.T) { + c := NewClient(nil) + dir := "/queue" + defer func() { + c.DeleteDir(dir) + }() + + var firstKey, secondKey string + + resp, err := c.CreateInOrder(dir, "1", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Value == "1" && resp.Node.TTL == 5) { + t.Fatalf("Create 1 failed: %#v", resp) + } + + firstKey = resp.Node.Key + + resp, err = c.CreateInOrder(dir, "2", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Value == "2" && resp.Node.TTL == 5) { + t.Fatalf("Create 2 failed: %#v", resp) + } + + secondKey = resp.Node.Key + + if firstKey >= secondKey { + t.Fatalf("Expected first key to be greater than second key, but %s is not greater than %s", + firstKey, secondKey) + } +} + +func TestSetDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("foo", true) + c.Delete("fooDir", true) + }() + + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/fooDir" && resp.Node.Value == "" && resp.Node.TTL == 5) { + t.Fatalf("SetDir 1 failed: %#v", resp) + } + if resp.PrevNode != nil { + t.Fatalf("SetDir 1 PrevNode failed: %#v", resp) + } + + // This should fail because /fooDir already points to a directory + resp, err = c.CreateDir("/fooDir", 5) + if err == nil { + t.Fatalf("fooDir already points to a directory, so SetDir should have failed."+ + "The response was: %#v", resp) + } + + _, err = c.Set("foo", "bar", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed + // It should replace the key + resp, err = c.SetDir("foo", 5) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/foo" && resp.Node.Value == "" && resp.Node.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } + if !(resp.PrevNode.Key == "/foo" && resp.PrevNode.Value == "bar" && resp.PrevNode.TTL == 5) { + t.Fatalf("SetDir 2 failed: %#v", resp) + } +} + +func TestUpdateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("fooDir", true) + }() + + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + // This should succeed. + resp, err = c.UpdateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "update" && resp.Node.Key == "/fooDir" && + resp.Node.Value == "" && resp.Node.TTL == 5) { + t.Fatalf("UpdateDir 1 failed: %#v", resp) + } + if !(resp.PrevNode.Key == "/fooDir" && resp.PrevNode.Dir == true && resp.PrevNode.TTL == 5) { + t.Fatalf("UpdateDir 1 PrevNode failed: %#v", resp) + } + + // This should fail because the key does not exist. + resp, err = c.UpdateDir("nonexistentDir", 5) + if err == nil { + t.Fatalf("The key %v did not exist, so the update should have failed."+ + "The response was: %#v", resp.Node.Key, resp) + } +} + +func TestCreateDir(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("fooDir", true) + }() + + // This should succeed + resp, err := c.CreateDir("fooDir", 5) + if err != nil { + t.Fatal(err) + } + + if !(resp.Action == "create" && resp.Node.Key == "/fooDir" && + resp.Node.Value == "" && resp.Node.TTL == 5) { + t.Fatalf("CreateDir 1 failed: %#v", resp) + } + if resp.PrevNode != nil { + t.Fatalf("CreateDir 1 PrevNode failed: %#v", resp) + } + + // This should fail, because the key is already there + resp, err = c.CreateDir("fooDir", 5) + if err == nil { + t.Fatalf("The key %v did exist, so the creation should have failed."+ + "The response was: %#v", resp.Node.Key, resp) + } +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go new file mode 100644 index 0000000000..b1e9ed2713 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/version.go @@ -0,0 +1,6 @@ +package etcd + +const ( + version = "v2" + packageVersion = "v2.0.0+git" +) diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch.go new file mode 100644 index 0000000000..aa8d3df301 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch.go @@ -0,0 +1,103 @@ +package etcd + +import ( + "errors" +) + +// Errors introduced by the Watch command. +var ( + ErrWatchStoppedByUser = errors.New("Watch stopped by the user via stop channel") +) + +// If recursive is set to true the watch returns the first change under the given +// prefix since the given index. +// +// If recursive is set to false the watch returns the first change to the given key +// since the given index. +// +// To watch for the latest change, set waitIndex = 0. +// +// If a receiver channel is given, it will be a long-term watch. Watch will block at the +//channel. After someone receives the channel, it will go on to watch that +// prefix. If a stop channel is given, the client can close long-term watch using +// the stop channel. +func (c *Client) Watch(prefix string, waitIndex uint64, recursive bool, + receiver chan *Response, stop chan bool) (*Response, error) { + logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + return raw.Unmarshal() + } + defer close(receiver) + + for { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + resp, err := raw.Unmarshal() + + if err != nil { + return nil, err + } + + waitIndex = resp.Node.ModifiedIndex + 1 + receiver <- resp + } +} + +func (c *Client) RawWatch(prefix string, waitIndex uint64, recursive bool, + receiver chan *RawResponse, stop chan bool) (*RawResponse, error) { + + logger.Debugf("rawWatch %s [%s]", prefix, c.cluster.Leader) + if receiver == nil { + return c.watchOnce(prefix, waitIndex, recursive, stop) + } + + for { + raw, err := c.watchOnce(prefix, waitIndex, recursive, stop) + + if err != nil { + return nil, err + } + + resp, err := raw.Unmarshal() + + if err != nil { + return nil, err + } + + waitIndex = resp.Node.ModifiedIndex + 1 + receiver <- raw + } +} + +// helper func +// return when there is change under the given prefix +func (c *Client) watchOnce(key string, waitIndex uint64, recursive bool, stop chan bool) (*RawResponse, error) { + + options := Options{ + "wait": true, + } + if waitIndex > 0 { + options["waitIndex"] = waitIndex + } + if recursive { + options["recursive"] = true + } + + resp, err := c.getCancelable(key, options, stop) + + if err == ErrRequestCancelled { + return nil, ErrWatchStoppedByUser + } + + return resp, err +} diff --git a/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch_test.go b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch_test.go new file mode 100644 index 0000000000..43e1dfeb81 --- /dev/null +++ b/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd/watch_test.go @@ -0,0 +1,119 @@ +package etcd + +import ( + "fmt" + "runtime" + "testing" + "time" +) + +func TestWatch(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("watch_foo", true) + }() + + go setHelper("watch_foo", "bar", c) + + resp, err := c.Watch("watch_foo", 0, false, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") { + t.Fatalf("Watch 1 failed: %#v", resp) + } + + go setHelper("watch_foo", "bar", c) + + resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, false, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/watch_foo" && resp.Node.Value == "bar") { + t.Fatalf("Watch 2 failed: %#v", resp) + } + + routineNum := runtime.NumGoroutine() + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + go setLoop("watch_foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.Watch("watch_foo", 0, false, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } + + if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum { + t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum) + } +} + +func TestWatchAll(t *testing.T) { + c := NewClient(nil) + defer func() { + c.Delete("watch_foo", true) + }() + + go setHelper("watch_foo/foo", "bar", c) + + resp, err := c.Watch("watch_foo", 0, true, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") { + t.Fatalf("WatchAll 1 failed: %#v", resp) + } + + go setHelper("watch_foo/foo", "bar", c) + + resp, err = c.Watch("watch_foo", resp.Node.ModifiedIndex+1, true, nil, nil) + if err != nil { + t.Fatal(err) + } + if !(resp.Node.Key == "/watch_foo/foo" && resp.Node.Value == "bar") { + t.Fatalf("WatchAll 2 failed: %#v", resp) + } + + ch := make(chan *Response, 10) + stop := make(chan bool, 1) + + routineNum := runtime.NumGoroutine() + + go setLoop("watch_foo/foo", "bar", c) + + go receiver(ch, stop) + + _, err = c.Watch("watch_foo", 0, true, ch, stop) + if err != ErrWatchStoppedByUser { + t.Fatalf("Watch returned a non-user stop error") + } + + if newRoutineNum := runtime.NumGoroutine(); newRoutineNum != routineNum { + t.Fatalf("Routine numbers differ after watch stop: %v, %v", routineNum, newRoutineNum) + } +} + +func setHelper(key, value string, c *Client) { + time.Sleep(time.Second) + c.Set(key, value, 100) +} + +func setLoop(key, value string, c *Client) { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + newValue := fmt.Sprintf("%s_%v", value, i) + c.Set(key, newValue, 100) + time.Sleep(time.Second / 10) + } +} + +func receiver(c chan *Response, stop chan bool) { + for i := 0; i < 10; i++ { + <-c + } + stop <- true +} diff --git a/physical/etcd.go b/physical/etcd.go new file mode 100644 index 0000000000..5b8059ab70 --- /dev/null +++ b/physical/etcd.go @@ -0,0 +1,169 @@ +package physical + +import ( + "encoding/base64" + "errors" + "path/filepath" + "strings" + "time" + + "github.com/armon/go-metrics" + "github.com/coreos/go-etcd/etcd" +) + +const ( + + // Ideally, this prefix would match the "_" used in the file backend, but + // that prefix has special meaining in etcd. Specifically, it excludes those + // entries from directory listings. + EtcdNodeFilePrefix = "." + + // The delimiter is the same as the `-C` flag of etcdctl. + EtcdMachineDelimiter = "," +) + +var ( + EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster") +) + +// errorIsMissingKey returns true if the given error is an etcd error with an +// error code corresponding to a missing key. +func errorIsMissingKey(err error) bool { + etcdErr, ok := err.(*etcd.EtcdError) + return ok && etcdErr.ErrorCode == 100 +} + +// EtcdBackend is a physical backend that stores data at specific +// prefix within Etcd. It is used for most production situations as +// it allows Vault to run on multiple machines in a highly-available manner. +type EtcdBackend struct { + path string + client *etcd.Client +} + +// newEtcdBackend constructs a etcd backend using a given machine address. +func newEtcdBackend(conf map[string]string) (Backend, error) { + + // Get the etcd path form the configuration. + path, ok := conf["path"] + if !ok { + path = "/vault" + } + + // Ensure path is prefixed. + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + + // Set a default machines list and check for an overriding address value. + machines := "http://128.0.0.1:4001" + if address, ok := conf["address"]; ok { + machines = address + } + + // Create a new client from the supplied addres and attempt to sync with the + // cluster. + client := etcd.NewClient(strings.Split(machines, EtcdMachineDelimiter)) + if !client.SyncCluster() { + return nil, EtcdSyncClusterError + } + + // Setup the backend + return &EtcdBackend{ + path: path, + client: client, + }, nil +} + +// Put is used to insert or update an entry. +func (c *EtcdBackend) Put(entry *Entry) error { + defer metrics.MeasureSince([]string{"etcd", "put"}, time.Now()) + value := base64.StdEncoding.EncodeToString(entry.Value) + _, err := c.client.Set(c.nodePath(entry.Key), value, 0) + return err +} + +// Get is used to fetch an entry. +func (c *EtcdBackend) Get(key string) (*Entry, error) { + defer metrics.MeasureSince([]string{"etcd", "get"}, time.Now()) + + response, err := c.client.Get(c.nodePath(key), false, false) + if err != nil { + if errorIsMissingKey(err) { + return nil, nil + } + return nil, err + } + + // Decode the stored value from base-64. + value, err := base64.StdEncoding.DecodeString(response.Node.Value) + if err != nil { + return nil, err + } + + // Construct and return a new entry. + return &Entry{ + Key: key, + Value: value, + }, nil +} + +// Delete is used to permanently delete an entry. +func (c *EtcdBackend) Delete(key string) error { + defer metrics.MeasureSince([]string{"etcd", "delete"}, time.Now()) + + // Remove the key, non-recursively. + _, err := c.client.Delete(c.nodePath(key), false) + if err != nil && !errorIsMissingKey(err) { + return err + } + + return nil +} + +// List is used to list all the keys under a given prefix, up to the next +// prefix. +func (c *EtcdBackend) List(prefix string) ([]string, error) { + defer metrics.MeasureSince([]string{"etcd", "list"}, time.Now()) + + // Set a directory path from the given prefix. + path := c.nodePathDir(prefix) + + // Get the directory, non-recursively, from etcd. If the directory is + // missing, we just return an empty list of contents. + response, err := c.client.Get(path, true, false) + if err != nil { + if errorIsMissingKey(err) { + return []string{}, nil + } + return nil, err + } + + out := make([]string, len(response.Node.Nodes)) + for i, node := range response.Node.Nodes { + + // etcd keys include the full path, so let's trim the prefix directory + // path. + name := strings.TrimPrefix(node.Key, path) + + // Check if this node is itself a directory. If it is, add a trailing + // slash; if it isn't remove the node file prefix. + if node.Dir { + out[i] = name + "/" + } else { + out[i] = name[1:] + } + } + + return out, nil +} + +// nodePath returns an etcd filepath based on the given key. +func (b *EtcdBackend) nodePath(key string) string { + return filepath.Join(b.path, filepath.Dir(key), EtcdNodeFilePrefix+filepath.Base(key)) +} + +// nodePathDir returns an etcd directory path based on the given key. +func (b *EtcdBackend) nodePathDir(key string) string { + return filepath.Join(b.path, key) + "/" +} diff --git a/physical/etcd_test.go b/physical/etcd_test.go new file mode 100644 index 0000000000..b40f9427d7 --- /dev/null +++ b/physical/etcd_test.go @@ -0,0 +1,40 @@ +package physical + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/coreos/go-etcd/etcd" +) + +func TestEtcdBackend(t *testing.T) { + addr := os.Getenv("ETCD_ADDR") + if addr == "" { + t.SkipNow() + } + + client := etcd.NewClient([]string{addr}) + if !client.SyncCluster() { + t.Fatalf("err: %v", EtcdSyncClusterError) + } + + randPath := fmt.Sprintf("/vault-%d", time.Now().Unix()) + defer func() { + if _, err := client.Delete(randPath, true); err != nil { + t.Fatalf("err: %v", err) + } + }() + + b, err := NewBackend("etcd", map[string]string{ + "address": addr, + "path": randPath, + }) + if err != nil { + t.Fatalf("err: %s", err) + } + + testBackend(t, b) + testBackend_ListPrefix(t, b) +} diff --git a/physical/physical.go b/physical/physical.go index 000bf40e3e..b992c10102 100644 --- a/physical/physical.go +++ b/physical/physical.go @@ -83,4 +83,5 @@ var BuiltinBackends = map[string]Factory{ "zookeeper": newZookeeperBackend, "file": newFileBackend, "s3": newS3Backend, + "etcd": newEtcdBackend, } diff --git a/website/source/docs/config/index.html.md b/website/source/docs/config/index.html.md index 4b53eaf080..8d9dd57b84 100644 --- a/website/source/docs/config/index.html.md +++ b/website/source/docs/config/index.html.md @@ -60,6 +60,9 @@ durability, etc. * `zookeeper` - Store data within [Zookeeper](https://zookeeper.apache.org/). This backend does not support HA. + * `etcd` - Store data within [etcd](https://coreos.com/etcd/). + This backend does not support HA. + * `s3` - Store data within an S3 bucket [S3](http://aws.amazon.com/s3/). This backend does not support HA. @@ -107,6 +110,17 @@ For Zookeeper, the following options are supported: Can be comma separated list (host:port) of many Zookeeper instances. Defaults to "localhost:2181" if not specified. +#### Backend Reference: etcd + +For etcd, the following options are supported: + + * `path` (optional) - The path within etcd where data will be stored. + Defaults to "vault/". + + * `address` (optional) - The address(es) of the etcd instance(s) to talk to. + Can be comma separated list (protocol://host:port) of many etcd instances. + Defaults to "http://localhost:4001" if not specified. + #### Backend Reference: S3 For S3, the following options are supported: