Merge branch 'master' into feature/inclusterconfig

This commit is contained in:
Dmitry Mishin 2017-07-28 12:14:44 -07:00
commit 1d62de7159
4 changed files with 169 additions and 52 deletions

View File

@ -29,6 +29,9 @@ Features:
Network policy controller is responsible for reading the namespace, network policy and pods information from Kubernetes API server and configure iptables accordingly to provide ingress filter to the pods.
Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA
[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) and also network policy beta semantics.
Please read blog for design details of Network Policy controller
https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/

View File

@ -63,6 +63,9 @@ is easy with kube-router -- just add a flag to kube-router. It uses ipsets with
iptables to ensure your firewall rules have as little performance impact on your
cluster as possible.
Kube-router supports the networking.k8s.io/NetworkPolicy API or network policy V1/GA
[semantics](https://github.com/kubernetes/kubernetes/pull/39164#issue-197243974) and also network policy beta semantics.
Read more about kube-router's approach to Kubernetes Network Policies:
- [Enforcing Kubernetes network policies with iptables](https://cloudnativelabs.github.io/post/2017-05-1-kube-network-policies/)

View File

@ -21,6 +21,8 @@ import (
"github.com/janeczku/go-ipset/ipset"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
)
// Network policy controller provides an ingress firewall for the pods as per the defined network policies.
@ -34,10 +36,11 @@ import (
// dropped by the rule in the pod chain, if there is no match.
type NetworkPolicyController struct {
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
nodeIP net.IP
nodeHostName string
mu sync.Mutex
syncPeriod time.Duration
v1NetworkPolicy bool
// list of all active network policies expressed as networkPolicyInfo
networkPoliciesInfo *[]networkPolicyInfo
@ -94,7 +97,7 @@ func (npc *NetworkPolicyController) Run(stopCh <-chan struct{}, wg *sync.WaitGro
glog.Infof("Performing periodic syn of the iptables to reflect network policies")
err := npc.Sync()
if err != nil {
glog.Errorf("Error during periodic sync: ", err)
glog.Errorf("Error during periodic sync: " + err.Error())
}
} else {
continue
@ -122,7 +125,6 @@ func (npc *NetworkPolicyController) OnPodUpdate(podUpdate *watchers.PodUpdate) {
}
func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *watchers.NetworkPolicyUpdate) {
glog.Infof("Received network policy update namspace:%s policy name:%s", networkPolicyUpdate.NetworkPolicy.Namespace, networkPolicyUpdate.NetworkPolicy.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
if err != nil {
@ -134,6 +136,12 @@ func (npc *NetworkPolicyController) OnNetworkPolicyUpdate(networkPolicyUpdate *w
}
func (npc *NetworkPolicyController) OnNamespaceUpdate(namespaceUpdate *watchers.NamespaceUpdate) {
// namespace (and annotations on it) has no significance in GA ver of network policy
if npc.v1NetworkPolicy {
return
}
glog.Infof("Received namesapce update namspace:%s", namespaceUpdate.Namespace.Name)
if watchers.PodWatcher.HasSynced() && watchers.NetworkPolicyWatcher.HasSynced() {
err := npc.Sync()
@ -162,24 +170,33 @@ func (npc *NetworkPolicyController) Sync() error {
glog.Infof("sync iptables took %v", time.Since(start))
}()
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: %s" + err.Error())
if npc.v1NetworkPolicy {
npc.networkPoliciesInfo, err = buildNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
} else {
npc.networkPoliciesInfo, err = buildBetaNetworkPoliciesInfo()
if err != nil {
return errors.New("Aborting sync. Failed to build network policies: " + err.Error())
}
}
activePolicyChains, err := npc.syncNetworkPolicyChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync network policy chains: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync network policy chains: " + err.Error())
}
activePodFwChains, err := npc.syncPodFirewallChains()
if err != nil {
return errors.New("Aborting sync. Failed to sync pod firewalls: %s" + err.Error())
return errors.New("Aborting sync. Failed to sync pod firewalls: " + err.Error())
}
err = cleanupStaleRules(activePolicyChains, activePodFwChains)
if err != nil {
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: %s" + err.Error())
return errors.New("Aborting sync. Failed to cleanup stale iptable rules: " + err.Error())
}
return nil
@ -334,11 +351,11 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
}
// loop through the pods running on the node which has default ingress to be denied
podsOnNodeInfo, err := getPodsRunningOnNode(npc.nodeIP.String())
firewallEnabledPods, err := npc.getFirewallEnabledPods(npc.nodeIP.String())
if err != nil {
return nil, err
}
for _, pod := range *podsOnNodeInfo {
for _, pod := range *firewallEnabledPods {
// below condition occurs when we get trasient update while removing or adding pod
// subseqent update will do the correct action
@ -346,7 +363,7 @@ func (npc *NetworkPolicyController) syncPodFirewallChains() (map[string]bool, er
continue
}
// ensure pod specfic firewall chain exist for all the pods running on this node
// ensure pod specfic firewall chain exist for all the pods that need ingress firewall
podFwChainName := podFirewallChainName(pod.namespace, pod.name)
err = iptablesCmdHandler.NewChain("filter", podFwChainName)
if err != nil && err.(*iptables.Error).ExitStatus() != 1 {
@ -557,7 +574,7 @@ func cleanupStaleRules(activePolicyChains, activePodFwChains map[string]bool) er
return nil
}
func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) {
func (npc *NetworkPolicyController) getFirewallEnabledPods(nodeIp string) (*map[string]podInfo, error) {
nodePods := make(map[string]podInfo)
@ -565,12 +582,39 @@ func getPodsRunningOnNode(nodeIp string) (*map[string]podInfo, error) {
if strings.Compare(pod.Status.HostIP, nodeIp) != 0 {
continue
}
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
if npc.v1NetworkPolicy {
podNeedsFirewall := false
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*networking.NetworkPolicy)
if policy.Namespace != pod.ObjectMeta.Namespace {
continue
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace,
policy.Spec.PodSelector.MatchLabels)
if err != nil {
return nil, fmt.Errorf("Failed to get the pods %s", err.Error())
}
for _, matchingPod := range matchingPods {
if matchingPod.ObjectMeta.Name == pod.ObjectMeta.Name {
podNeedsFirewall = true
break
}
}
if podNeedsFirewall {
break
}
}
if !podNeedsFirewall {
continue
}
} else {
default_policy, err := getNameSpaceDefaultPolicy(pod.ObjectMeta.Namespace)
if err != nil {
return nil, fmt.Errorf("Failed to get the namespace default ingress policy %s", err.Error())
}
if strings.Compare(default_policy, "DefaultDeny") != 0 {
continue
}
}
nodePods[pod.Status.PodIP] = podInfo{ip: pod.Status.PodIP,
name: pod.ObjectMeta.Name,
@ -584,7 +628,66 @@ func buildNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policy := range watchers.NetworkPolicyWatcher.List() {
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, ok := policy_obj.(*networking.NetworkPolicy)
if !ok {
return nil, fmt.Errorf("Failed to convert")
}
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
labels: policy.Spec.PodSelector.MatchLabels,
}
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, policy.Spec.PodSelector.MatchLabels)
newPolicy.destPods = make(map[string]podInfo)
newPolicy.ingressRules = make([]ingressRule, 0)
if err == nil {
for _, matchingPod := range matchingPods {
newPolicy.destPods[matchingPod.Status.PodIP] = podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels}
}
}
for _, specIngressRule := range policy.Spec.Ingress {
ingressRule := ingressRule{}
ingressRule.ports = make([]protocolAndPort, 0)
for _, port := range specIngressRule.Ports {
protocolAndPort := protocolAndPort{protocol: string(*port.Protocol), port: port.Port.String()}
ingressRule.ports = append(ingressRule.ports, protocolAndPort)
}
ingressRule.srcPods = make([]podInfo, 0)
for _, peer := range specIngressRule.From {
matchingPods, err := watchers.PodWatcher.ListByNamespaceAndLabels(policy.Namespace, peer.PodSelector.MatchLabels)
if err == nil {
for _, matchingPod := range matchingPods {
ingressRule.srcPods = append(ingressRule.srcPods,
podInfo{ip: matchingPod.Status.PodIP,
name: matchingPod.ObjectMeta.Name,
namespace: matchingPod.ObjectMeta.Namespace,
labels: matchingPod.ObjectMeta.Labels})
}
}
}
newPolicy.ingressRules = append(newPolicy.ingressRules, ingressRule)
}
NetworkPolicies = append(NetworkPolicies, newPolicy)
}
return &NetworkPolicies, nil
}
func buildBetaNetworkPoliciesInfo() (*[]networkPolicyInfo, error) {
NetworkPolicies := make([]networkPolicyInfo, 0)
for _, policy_obj := range watchers.NetworkPolicyWatcher.List() {
policy, _ := policy_obj.(*apiextensions.NetworkPolicy)
newPolicy := networkPolicyInfo{
name: policy.Name,
namespace: policy.Namespace,
@ -781,6 +884,13 @@ func NewNetworkPolicyController(clientset *kubernetes.Clientset, config *options
npc.syncPeriod = config.IPTablesSyncPeriod
npc.v1NetworkPolicy = true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
npc.v1NetworkPolicy = false
}
node, err := utils.GetNodeObject(clientset, config.HostnameOverride)
if err != nil {
return nil, err

View File

@ -2,6 +2,7 @@ package watchers
import (
"reflect"
"strconv"
"time"
"github.com/cloudnativelabs/kube-router/utils"
@ -9,11 +10,12 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
apiextensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
networking "k8s.io/client-go/pkg/apis/networking/v1"
cache "k8s.io/client-go/tools/cache"
)
type NetworkPolicyUpdate struct {
NetworkPolicy *apiextensions.NetworkPolicy
NetworkPolicy interface{}
Op Operation
}
@ -33,28 +35,16 @@ type NetworkPolicyUpdatesHandler interface {
}
func (npw *networkPolicyWatcher) networkPolicyAddEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: ADD, NetworkPolicy: obj})
}
func (npw *networkPolicyWatcher) networkPolicyDeleteEventHandler(obj interface{}) {
policy, ok := obj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: REMOVE, NetworkPolicy: obj})
}
func (npw *networkPolicyWatcher) networkPolicyUpdateEventHandler(oldObj, newObj interface{}) {
policy, ok := newObj.(*apiextensions.NetworkPolicy)
if !ok {
return
}
if !reflect.DeepEqual(newObj, oldObj) {
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: policy})
npw.broadcaster.Notify(&NetworkPolicyUpdate{Op: UPDATE, NetworkPolicy: newObj})
}
}
@ -64,13 +54,8 @@ func (npw *networkPolicyWatcher) RegisterHandler(handler NetworkPolicyUpdatesHan
}))
}
func (npw *networkPolicyWatcher) List() []*apiextensions.NetworkPolicy {
obj_list := npw.networkPolicyLister.List()
np_instances := make([]*apiextensions.NetworkPolicy, len(obj_list))
for i, ins := range obj_list {
np_instances[i] = ins.(*apiextensions.NetworkPolicy)
}
return np_instances
func (npw *networkPolicyWatcher) List() []interface{} {
return npw.networkPolicyLister.List()
}
func (npw *networkPolicyWatcher) HasSynced() bool {
@ -91,13 +76,29 @@ func StartNetworkPolicyWatcher(clientset *kubernetes.Clientset, resyncPeriod tim
}
npw.clientset = clientset
v1NetworkPolicy := true
v, _ := clientset.Discovery().ServerVersion()
minorVer, _ := strconv.Atoi(v.Minor)
if v.Major == "1" && minorVer < 7 {
v1NetworkPolicy = false
}
npw.broadcaster = utils.NewBroadcaster()
lw := cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw,
&apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
var lw *cache.ListWatch
if v1NetworkPolicy {
lw = cache.NewListWatchFromClient(clientset.Networking().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &networking.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
} else {
lw = cache.NewListWatchFromClient(clientset.Extensions().RESTClient(), "networkpolicies", metav1.NamespaceAll, fields.Everything())
npw.networkPolicyLister, npw.networkPolicyController = cache.NewIndexerInformer(
lw, &apiextensions.NetworkPolicy{}, resyncPeriod, eventHandler,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
}
networkPolicyStopCh = make(chan struct{})
go npw.networkPolicyController.Run(networkPolicyStopCh)
return &npw, nil