aports/community/openscap/no-deprecated-docker.patch
2023-04-29 02:14:42 +02:00

1036 lines
39 KiB
Diff

From 5eabf7d1614334928b47dfb655b82b8407536bfd Mon Sep 17 00:00:00 2001
From: Craig Andrews <candrews@integralblue.com>
Date: Tue, 14 Feb 2023 08:57:36 -0500
Subject: [PATCH 1/5] Remove usage of atomic from oscap-docker
The last release of atomic was 5 years ago.
The project is officially deprecated.
No linux distributions still package atomic.
---
utils/oscap-docker.in | 36 +-
.../oscap_docker_python/oscap_docker_util.py | 377 +++++++-----------
.../oscap_docker_util_noatomic.py | 227 -----------
3 files changed, 157 insertions(+), 483 deletions(-)
delete mode 100644 utils/oscap_docker_python/oscap_docker_util_noatomic.py
diff --git a/utils/oscap-docker.in b/utils/oscap-docker.in
index 4868eb5e11a..edcdc0e5e62 100644
--- a/utils/oscap-docker.in
+++ b/utils/oscap-docker.in
@@ -21,8 +21,7 @@
''' oscap docker command '''
import argparse
-from oscap_docker_python.oscap_docker_util import OscapAtomicScan, \
- OscapDockerScan, isAtomicLoaded
+from oscap_docker_python.oscap_docker_util import OscapDockerScan
import docker
import traceback
@@ -47,9 +46,6 @@ if __name__ == '__main__':
more about OSCAP-ARGUMENTS')
parser.add_argument('--oscap', dest='oscap_binary', default='',
help='Set the oscap binary to use')
-
- parser.add_argument('--disable-atomic', dest='noatomic', action='store_true',
- help="Force to use native docker API instead of atomic")
subparser = parser.add_subparsers(help="commands")
# Scan CVEs in image
@@ -94,28 +90,14 @@ if __name__ == '__main__':
sys.exit(1)
try:
- if isAtomicLoaded() and not args.noatomic:
- print("Using Atomic API")
- OS = OscapAtomicScan(oscap_binary=args.oscap_binary)
- if args.action == "scan":
- rc = OscapAtomicScan.scan(OS, args.scan_target, leftover_args)
- elif args.action == "scan_cve":
- rc = OscapAtomicScan.scan_cve(OS, args.scan_target, leftover_args)
- else:
- parser.print_help()
- sys.exit(2)
-
- else: # without atomic
- print("Using native Docker API")
-
- ODS = OscapDockerScan(args.scan_target, args.is_image, args.oscap_binary)
- if args.action == "scan":
- rc = OscapDockerScan.scan(ODS, leftover_args)
- elif args.action == "scan_cve":
- rc = OscapDockerScan.scan_cve(ODS, leftover_args)
- else:
- parser.print_help()
- sys.exit(2)
+ ODS = OscapDockerScan(args.scan_target, args.is_image, args.oscap_binary)
+ if args.action == "scan":
+ rc = OscapDockerScan.scan(ODS, leftover_args)
+ elif args.action == "scan_cve":
+ rc = OscapDockerScan.scan_cve(ODS, leftover_args)
+ else:
+ parser.print_help()
+ sys.exit(2)
except (ValueError, RuntimeError) as e:
raise e
diff --git a/utils/oscap_docker_python/oscap_docker_util.py b/utils/oscap_docker_python/oscap_docker_util.py
index 2674a44381a..16ce51ebac4 100644
--- a/utils/oscap_docker_python/oscap_docker_util.py
+++ b/utils/oscap_docker_python/oscap_docker_util.py
@@ -16,293 +16,212 @@
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA
-''' Utilities for oscap-docker '''
-
from __future__ import print_function
import os
import tempfile
-import subprocess
-import platform
import shutil
from oscap_docker_python.get_cve_input import getInputCVE
import sys
import docker
+import uuid
import collections
-from oscap_docker_python.oscap_docker_util_noatomic import OscapDockerScan
from oscap_docker_python.oscap_docker_common import oscap_chroot, get_dist, \
OscapResult, OscapError
-atomic_loaded = False
-
-
-class AtomicError(Exception):
- """Exception raised when an error happens in atomic import
- """
- def __init__(self, message):
- self.message = message
-
-
-try:
- from Atomic.mount import DockerMount
- from Atomic.mount import MountError
- import inspect
- if "mnt_mkdir" not in inspect.getargspec(DockerMount.__init__).args:
- raise AtomicError(
- "\"Atomic.mount.DockerMount\" has been successfully imported but "
- "it doesn't support the mnt_mkdir argument. Please upgrade your "
- "Atomic installation to 1.4 or higher.\n"
- )
-
- # we only care about method names
- member_methods = [
- x[0] for x in
- inspect.getmembers(
- DockerMount, predicate=lambda member:
- inspect.isfunction(member) or inspect.ismethod(member)
- )
- ]
-
- if "_clean_temp_container_by_path" not in member_methods:
- raise AtomicError(
- "\"Atomic.mount.DockerMount\" has been successfully imported but "
- "it doesn't have the _clean_temp_container_by_path method. Please "
- "upgrade your Atomic installation to 1.4 or higher.\n"
- )
+class OscapError(Exception):
+ ''' oscap Error'''
+ pass
- # if all imports are ok we can use atomic
- atomic_loaded = True
-except ImportError:
- sys.stderr.write(
- "Failed to import \"Atomic.mount.DockerMount\". It seems Atomic has "
- "not been installed.\n"
- )
+OscapResult = collections.namedtuple("OscapResult", ("returncode", "stdout", "stderr"))
-except AtomicError as err:
- sys.stderr.write(err.message)
+class OscapDockerScan(object):
-def isAtomicLoaded():
- return atomic_loaded
+ def __init__(self, target, is_image=False, oscap_binary='oscap'):
+ # init docker low level api (usefull for deep details like container pid)
+ try:
+ self.client = docker.APIClient()
+ except AttributeError:
+ self.client = docker.Client()
-class OscapHelpers(object):
- ''' oscap class full of helpers for scanning '''
- CPE = 'oval:org.open-scap.cpe.rhel:def:'
- DISTS = ["8", "7", "6", "5"]
-
- def __init__(self, cve_input_dir, oscap_binary):
- self.cve_input_dir = cve_input_dir
+ # init docker high level API (to deal with start/stop/run containers/image)
+ self.client_api = docker.from_env()
+ self.is_image = is_image
+ self.stop_at_end = False # stop the container after scan if True
self.oscap_binary = oscap_binary or 'oscap'
+ self.container_name = None
+ self.image_name = None
+
+ if self.is_image:
+ self.image_name, self.config = self._get_image_name_and_config(target)
+ if self.image_name:
+ print("Running given image in a temporary container ...")
+ self.container_name = "tmp_oscap_" + str(uuid.uuid1())
+
+ try:
+ tmp_cont = self.client.create_container(
+ self.image_name, 'bash', name=self.container_name, tty=True)
+ # tty=True is required in order to keep the container running
+ self.client.start(container=tmp_cont.get('Id'))
+
+ self.config = self.client.inspect_container(self.container_name)
+ if int(self.config["State"]["Pid"]) == 0:
+ sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
+ else:
+ self.pid = int(self.config["State"]["Pid"])
+ except Exception as e:
+ sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
+ raise e
+ else:
+ raise ValueError("Image {0} not found.\n".format(target))
- @staticmethod
- def _mk_tmp_dir(tmp_dir):
- '''
- Creates a temporary directory and returns the whole
- path name
- '''
- tempfile.tempdir = tmp_dir
- return tempfile.mkdtemp()
+ else:
+ self.container_name, self.config = \
+ self._get_container_name_and_config(target)
+
+ # is the container running ?
+ if int(self.config["State"]["Pid"]) == 0:
+ print("Container {0} is stopped, running it temporarily ..."
+ .format(self.container_name))
+
+ self.client_api.containers.get(self.container_name).start()
+ self.container_name, self.config = \
+ self._get_container_name_and_config(target)
+
+ if int(self.config["State"]["Pid"]) == 0:
+ sys.stderr.write(
+ "Cannot keep running container {0}, skip it.\n \
+ Please start this container before scan it.\n"
+ .format(self.container_name))
+ else:
+ self.stop_at_end = True
- @staticmethod
- def _rm_tmp_dir(tmp_dir):
- '''
- Deletes the temporary directory created for the purposes
- of mount
- '''
- shutil.rmtree(tmp_dir)
+ # now we are sure that the container is running, get its PID
+ self.pid = int(self.config["State"]["Pid"])
+
+ if self._check_container_mountpoint():
+ self.mountpoint = "/proc/{0}/root".format(self.pid)
+ print("Docker container {0} ready to be scanned."
+ .format(self.container_name))
+ else:
+ self._end()
+ raise RuntimeError(
+ "Cannot access mountpoint of container {0}, "
+ "please RUN WITH ROOT privileges.\n"
+ .format(self.container_name))
+
+ def _end(self):
+ if self.is_image:
+ # stop and remove the temporary container
+ self.client.stop(self.container_name)
+ self.client.remove_container(self.container_name)
+ print("Temporary container {0} cleaned".format(self.container_name))
+ else:
+ if self.stop_at_end:
+ # just stop the container if the tool have started it.
+ self.client.stop(self.container_name)
- def _get_target_name_and_config(self, target):
+ def _get_image_name_and_config(self, target):
'''
- Determines if target is image or container. For images returns full
- image name if exists or image ID otherwise. For containers returns
+ Ensure that target is an image.
+ Returns full image name if exists or image ID otherwise.
+ For containers returns
container name if exists or container ID otherwise.
'''
+
try:
- client = docker.APIClient()
- except AttributeError:
- client = docker.Client()
- try:
- image = client.inspect_image(target)
+ image = self.client.inspect_image(target)
if image["RepoTags"]:
name = ", ".join(image["RepoTags"])
else:
name = image["Id"][len("sha256:"):][:10]
- return "docker-image://{}".format(name), image["Config"]
+ return name, image
except docker.errors.NotFound:
- try:
- container = client.inspect_container(target)
- if container["Name"]:
- name = container["Name"].lstrip("/")
- else:
- name = container["Id"][:10]
- return "docker-container://{}".format(name), container["Config"]
- except docker.errors.NotFound:
- return "unknown", {}
+ return None, {}
- def _scan_cve(self, chroot, target, dist, scan_args):
+ def _get_container_name_and_config(self, target):
'''
- Scan a chroot for cves
+ Ensure that target is a container.
+ Returns container name if exists or container ID otherwise.
'''
- cve_input = getInputCVE.dist_cve_name.format(dist)
-
- args = ("oval", "eval")
- for a in scan_args:
- args += (a,)
- args += (os.path.join(self.cve_input_dir, cve_input),)
-
- name, conf = self._get_target_name_and_config(target)
-
- return oscap_chroot(chroot, self.oscap_binary, args, name,
- conf.get("Env", []) or [])
-
- def _scan(self, chroot, target, scan_args):
- '''
- Scan a container or image
- '''
-
- name, conf = self._get_target_name_and_config(target)
- return oscap_chroot(chroot, self.oscap_binary, scan_args, name,
- conf.get("Env", []) or [])
+ try:
+ container = self.client.inspect_container(target)
+ if container["Name"]:
+ name = container["Name"].lstrip("/")
+ else:
+ name = container["Id"][:10]
+ return name, container
+ except docker.errors.NotFound:
+ return None, {}
- def resolve_image(self, image):
+ def _check_container_mountpoint(self):
'''
- Given an image or container name, uuid, or partial, return the
- uuid or iid or False if cannot be identified
+ Ensure that the container fs is well mounted and return its path
'''
- # TODO
- pass
+ return os.access("/proc/{0}/root".format(self.pid), os.R_OK)
- def _cleanup_by_path(self, path, DM):
+ def scan_cve(self, scan_args):
'''
- Cleans up the mounted chroot by umounting it and
- removing the temporary directory
+ Wrapper function for scanning cve of a mounted container
'''
- # Sometimes when this def is called, path will have 'rootfs'
- # appended. If it does, strip it and proceed
- _no_rootfs = path
- if os.path.basename(path) == 'rootfs':
- _no_rootfs = os.path.dirname(path)
-
- # umount chroot
- DM.unmount_path(_no_rootfs)
-
- # clean up temporary container
- DM._clean_temp_container_by_path(_no_rootfs)
- os.rmdir(_no_rootfs)
+ tmp_dir = tempfile.mkdtemp()
-def mount_image_filesystem():
- _tmp_mnt_dir = DM.mount(image)
+ # Figure out which RHEL dist is in the chroot
+ dist = get_dist(self.mountpoint, self.oscap_binary,
+ self.config["Config"].get("Env", []) or [])
-
-class OscapAtomicScan(object):
- def __init__(self, tmp_dir=tempfile.gettempdir(), mnt_dir=None,
- hours_old=2, oscap_binary=''):
- self.tmp_dir = tmp_dir
- self.helper = OscapHelpers(tmp_dir, oscap_binary)
- self.mnt_dir = mnt_dir
- self.hours_old = hours_old
-
- def _ensure_mnt_dir(self):
- '''
- Ensure existing temporary directory
- '''
- if self.mnt_dir is None:
- return tempfile.mkdtemp()
- else:
- return self.mnt_dir
-
- def _remove_mnt_dir(self, mnt_dir):
- '''
- Remove temporary directory, but only if the directory was not
- passed through __init__
- '''
- if self.mnt_dir is None:
- os.rmdir(mnt_dir)
-
- def _find_chroot_path(self, mnt_dir):
- '''
- Remember actual mounted fs in 'rootfs' for devicemapper
- '''
- rootfs_path = os.path.join(mnt_dir, 'rootfs')
- if os.path.exists(rootfs_path):
- chroot = rootfs_path
- else:
- chroot = mnt_dir
- return chroot
-
- def scan_cve(self, image, scan_args):
- '''
- Wrapper function for scanning a container or image
- '''
-
- mnt_dir = self._ensure_mnt_dir()
-
- # Mount the temporary image/container to the dir
- DM = DockerMount(mnt_dir, mnt_mkdir=True)
- try:
- _tmp_mnt_dir = DM.mount(image)
- except MountError as e:
- sys.stderr.write(str(e) + "\n")
+ if dist is None:
+ sys.stderr.write("{0} is not based on RHEL\n"
+ .format(self.image_name or self.container_name))
return None
- try:
- chroot = self._find_chroot_path(_tmp_mnt_dir)
+ # Fetch the CVE input data for the dist
+ fetch = getInputCVE(tmp_dir)
+ cve_file = fetch._fetch_single(dist)
- # Figure out which RHEL dist is in the chroot
- name, conf = self.helper._get_target_name_and_config(image)
- dist = get_dist(chroot, self.helper.oscap_binary, conf.get("Env", []) or [])
+ print("CVEs downloaded in " + cve_file)
+
+ args = ("oval", "eval")
+ for a in scan_args:
+ args += (a,)
+ args += (cve_file,)
- if dist is None:
- sys.stderr.write("{0} is not based on RHEL\n".format(image))
- return None
+ scan_result = oscap_chroot(
+ self.mountpoint, self.oscap_binary, args,
+ self.image_name or self.container_name,
+ self.config["Config"].get("Env", []) or [] # because Env can exists but be None
+ )
- # Fetch the CVE input data for the dist
- fetch = getInputCVE(self.tmp_dir)
- fetch._fetch_single(dist)
+ print(scan_result.stdout)
+ print(scan_result.stderr, file=sys.stderr)
- # Scan the chroot
- scan_result = self.helper._scan_cve(chroot, image, dist, scan_args)
- print(scan_result.stdout)
- print(scan_result.stderr, file=sys.stderr)
+ # cleanup
- finally:
- # Clean up
- self.helper._cleanup_by_path(_tmp_mnt_dir, DM)
- self._remove_mnt_dir(mnt_dir)
+ print("Cleaning temporary files ...")
+ shutil.rmtree(tmp_dir)
+ self._end()
return scan_result.returncode
- def scan(self, image, scan_args):
+ def scan(self, scan_args):
'''
- Wrapper function for basic security scans using
- openscap
+ Wrapper function forwarding oscap args for an offline scan
'''
+ scan_result = oscap_chroot(
+ "/proc/{0}/root".format(self.pid),
+ self.oscap_binary, scan_args,
+ self.image_name or self.container_name,
+ self.config["Config"].get("Env", []) or [] # because Env can exists but be None
+ )
- mnt_dir = self._ensure_mnt_dir()
-
- # Mount the temporary image/container to the dir
- DM = DockerMount(mnt_dir, mnt_mkdir=True)
- try:
- _tmp_mnt_dir = DM.mount(image)
- except MountError as e:
- sys.stderr.write(str(e) + "\n")
- return None
-
- try:
- chroot = self._find_chroot_path(_tmp_mnt_dir)
-
- # Scan the chroot
- scan_result = self.helper._scan(chroot, image, scan_args)
- print(scan_result.stdout)
- print(scan_result.stderr, file=sys.stderr)
+ print(scan_result.stdout)
+ print(scan_result.stderr, file=sys.stderr)
- finally:
- # Clean up
- self.helper._cleanup_by_path(_tmp_mnt_dir, DM)
- self._remove_mnt_dir(mnt_dir)
+ self._end()
return scan_result.returncode
diff --git a/utils/oscap_docker_python/oscap_docker_util_noatomic.py b/utils/oscap_docker_python/oscap_docker_util_noatomic.py
deleted file mode 100644
index 16ce51ebac4..00000000000
--- a/utils/oscap_docker_python/oscap_docker_util_noatomic.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright (C) 2015 Brent Baude <bbaude@redhat.com>
-# Copyright (C) 2019 Dominique Blaze <contact@d0m.tech>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the
-# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-# Boston, MA 02110-1301 USA
-
-from __future__ import print_function
-
-import os
-import tempfile
-import shutil
-from oscap_docker_python.get_cve_input import getInputCVE
-import sys
-import docker
-import uuid
-import collections
-from oscap_docker_python.oscap_docker_common import oscap_chroot, get_dist, \
- OscapResult, OscapError
-
-
-class OscapError(Exception):
- ''' oscap Error'''
- pass
-
-
-OscapResult = collections.namedtuple("OscapResult", ("returncode", "stdout", "stderr"))
-
-
-class OscapDockerScan(object):
-
- def __init__(self, target, is_image=False, oscap_binary='oscap'):
-
- # init docker low level api (usefull for deep details like container pid)
- try:
- self.client = docker.APIClient()
- except AttributeError:
- self.client = docker.Client()
-
- # init docker high level API (to deal with start/stop/run containers/image)
- self.client_api = docker.from_env()
- self.is_image = is_image
- self.stop_at_end = False # stop the container after scan if True
- self.oscap_binary = oscap_binary or 'oscap'
- self.container_name = None
- self.image_name = None
-
- if self.is_image:
- self.image_name, self.config = self._get_image_name_and_config(target)
- if self.image_name:
- print("Running given image in a temporary container ...")
- self.container_name = "tmp_oscap_" + str(uuid.uuid1())
-
- try:
- tmp_cont = self.client.create_container(
- self.image_name, 'bash', name=self.container_name, tty=True)
- # tty=True is required in order to keep the container running
- self.client.start(container=tmp_cont.get('Id'))
-
- self.config = self.client.inspect_container(self.container_name)
- if int(self.config["State"]["Pid"]) == 0:
- sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
- else:
- self.pid = int(self.config["State"]["Pid"])
- except Exception as e:
- sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
- raise e
- else:
- raise ValueError("Image {0} not found.\n".format(target))
-
- else:
- self.container_name, self.config = \
- self._get_container_name_and_config(target)
-
- # is the container running ?
- if int(self.config["State"]["Pid"]) == 0:
- print("Container {0} is stopped, running it temporarily ..."
- .format(self.container_name))
-
- self.client_api.containers.get(self.container_name).start()
- self.container_name, self.config = \
- self._get_container_name_and_config(target)
-
- if int(self.config["State"]["Pid"]) == 0:
- sys.stderr.write(
- "Cannot keep running container {0}, skip it.\n \
- Please start this container before scan it.\n"
- .format(self.container_name))
- else:
- self.stop_at_end = True
-
- # now we are sure that the container is running, get its PID
- self.pid = int(self.config["State"]["Pid"])
-
- if self._check_container_mountpoint():
- self.mountpoint = "/proc/{0}/root".format(self.pid)
- print("Docker container {0} ready to be scanned."
- .format(self.container_name))
- else:
- self._end()
- raise RuntimeError(
- "Cannot access mountpoint of container {0}, "
- "please RUN WITH ROOT privileges.\n"
- .format(self.container_name))
-
- def _end(self):
- if self.is_image:
- # stop and remove the temporary container
- self.client.stop(self.container_name)
- self.client.remove_container(self.container_name)
- print("Temporary container {0} cleaned".format(self.container_name))
- else:
- if self.stop_at_end:
- # just stop the container if the tool have started it.
- self.client.stop(self.container_name)
-
- def _get_image_name_and_config(self, target):
- '''
- Ensure that target is an image.
- Returns full image name if exists or image ID otherwise.
- For containers returns
- container name if exists or container ID otherwise.
- '''
-
- try:
- image = self.client.inspect_image(target)
- if image["RepoTags"]:
- name = ", ".join(image["RepoTags"])
- else:
- name = image["Id"][len("sha256:"):][:10]
- return name, image
- except docker.errors.NotFound:
- return None, {}
-
- def _get_container_name_and_config(self, target):
- '''
- Ensure that target is a container.
- Returns container name if exists or container ID otherwise.
- '''
- try:
- container = self.client.inspect_container(target)
- if container["Name"]:
- name = container["Name"].lstrip("/")
- else:
- name = container["Id"][:10]
- return name, container
- except docker.errors.NotFound:
- return None, {}
-
- def _check_container_mountpoint(self):
- '''
- Ensure that the container fs is well mounted and return its path
- '''
- return os.access("/proc/{0}/root".format(self.pid), os.R_OK)
-
- def scan_cve(self, scan_args):
- '''
- Wrapper function for scanning cve of a mounted container
- '''
-
- tmp_dir = tempfile.mkdtemp()
-
- # Figure out which RHEL dist is in the chroot
- dist = get_dist(self.mountpoint, self.oscap_binary,
- self.config["Config"].get("Env", []) or [])
-
- if dist is None:
- sys.stderr.write("{0} is not based on RHEL\n"
- .format(self.image_name or self.container_name))
- return None
-
- # Fetch the CVE input data for the dist
- fetch = getInputCVE(tmp_dir)
- cve_file = fetch._fetch_single(dist)
-
- print("CVEs downloaded in " + cve_file)
-
- args = ("oval", "eval")
- for a in scan_args:
- args += (a,)
- args += (cve_file,)
-
- scan_result = oscap_chroot(
- self.mountpoint, self.oscap_binary, args,
- self.image_name or self.container_name,
- self.config["Config"].get("Env", []) or [] # because Env can exists but be None
- )
-
- print(scan_result.stdout)
- print(scan_result.stderr, file=sys.stderr)
-
- # cleanup
-
- print("Cleaning temporary files ...")
- shutil.rmtree(tmp_dir)
- self._end()
-
- return scan_result.returncode
-
- def scan(self, scan_args):
- '''
- Wrapper function forwarding oscap args for an offline scan
- '''
- scan_result = oscap_chroot(
- "/proc/{0}/root".format(self.pid),
- self.oscap_binary, scan_args,
- self.image_name or self.container_name,
- self.config["Config"].get("Env", []) or [] # because Env can exists but be None
- )
-
- print(scan_result.stdout)
- print(scan_result.stderr, file=sys.stderr)
-
- self._end()
-
- return scan_result.returncode
From 4ae0e013ace925d9c453a3807768a268a8dcd423 Mon Sep 17 00:00:00 2001
From: Craig Andrews <candrews@integralblue.com>
Date: Tue, 14 Feb 2023 09:05:02 -0500
Subject: [PATCH 2/5] Get the docker client api from the client
Getting the client api from the client reuses the client configuration.
This fixes an inconsistency where environment variable configuration is
used for the client but not for the client api.
This change therefore enables environment variable configuration (ex
DOCKER_HOST) to work.
---
utils/oscap_docker_python/oscap_docker_util.py | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git a/utils/oscap_docker_python/oscap_docker_util.py b/utils/oscap_docker_python/oscap_docker_util.py
index 16ce51ebac4..6e251cc4114 100644
--- a/utils/oscap_docker_python/oscap_docker_util.py
+++ b/utils/oscap_docker_python/oscap_docker_util.py
@@ -42,12 +42,6 @@ class OscapDockerScan(object):
def __init__(self, target, is_image=False, oscap_binary='oscap'):
- # init docker low level api (usefull for deep details like container pid)
- try:
- self.client = docker.APIClient()
- except AttributeError:
- self.client = docker.Client()
-
# init docker high level API (to deal with start/stop/run containers/image)
self.client_api = docker.from_env()
self.is_image = is_image
@@ -56,6 +50,9 @@ def __init__(self, target, is_image=False, oscap_binary='oscap'):
self.container_name = None
self.image_name = None
+ # init docker low level api (useful for deep details like container pid)
+ self.client = self.client_api.api
+
if self.is_image:
self.image_name, self.config = self._get_image_name_and_config(target)
if self.image_name:
From aba83f3b59b693a12a889c4d65d1a9f0616bc164 Mon Sep 17 00:00:00 2001
From: Craig Andrews <candrews@integralblue.com>
Date: Tue, 14 Feb 2023 12:16:07 -0500
Subject: [PATCH 3/5] Remove docker ping test
OscapDockerScan will throw an error if docker isn't running;
there's no need to create a separate docker client in a different way
just to do this ping test.
---
utils/oscap-docker.in | 18 ------------------
1 file changed, 18 deletions(-)
diff --git a/utils/oscap-docker.in b/utils/oscap-docker.in
index edcdc0e5e62..5f162bf91fa 100644
--- a/utils/oscap-docker.in
+++ b/utils/oscap-docker.in
@@ -29,17 +29,6 @@ import sys
from requests import exceptions
-def ping_docker():
- ''' Simple check if the docker daemon is running '''
- # Class docker.Client was renamed to docker.APIClient in
- # python-docker-py 2.0.0.
- try:
- client = docker.APIClient()
- except AttributeError:
- client = docker.Client()
- client.ping()
-
-
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='oscap docker',
epilog='See `man oscap` to learn \
@@ -82,13 +71,6 @@ if __name__ == '__main__':
parser.print_help()
sys.exit(2)
- try:
- ping_docker()
-
- except exceptions.ConnectionError:
- print("The docker daemon does not appear to be running")
- sys.exit(1)
-
try:
ODS = OscapDockerScan(args.scan_target, args.is_image, args.oscap_binary)
if args.action == "scan":
From bd448ab71b0bb4d7dd6dc0215d0e06fa907199c6 Mon Sep 17 00:00:00 2001
From: Craig Andrews <candrews@integralblue.com>
Date: Wed, 15 Feb 2023 14:38:36 -0500
Subject: [PATCH 4/5] oscap-docker: don't start images to scan them
Starting images to scan them has downsides, including that the image must be able to be started and stay running, which isn't always easy. For example, not all images contain `bash` or a shell at all.
By instead using the approach taken by `oscap-podman` and extracting the image then scanning it (instead of starting the container and scanning the resulting mount), the container never needs to be run.
This approach allows `oscap-docker` to scan any image (including those without `bash`).
---
.../oscap_docker_python/oscap_docker_util.py | 73 ++++++++++---------
1 file changed, 39 insertions(+), 34 deletions(-)
diff --git a/utils/oscap_docker_python/oscap_docker_util.py b/utils/oscap_docker_python/oscap_docker_util.py
index 6e251cc4114..2ab80595220 100644
--- a/utils/oscap_docker_python/oscap_docker_util.py
+++ b/utils/oscap_docker_python/oscap_docker_util.py
@@ -19,6 +19,10 @@
from __future__ import print_function
import os
+import io
+from pathlib import Path
+from itertools import chain
+import tarfile
import tempfile
import shutil
from oscap_docker_python.get_cve_input import getInputCVE
@@ -45,10 +49,10 @@ def __init__(self, target, is_image=False, oscap_binary='oscap'):
# init docker high level API (to deal with start/stop/run containers/image)
self.client_api = docker.from_env()
self.is_image = is_image
- self.stop_at_end = False # stop the container after scan if True
self.oscap_binary = oscap_binary or 'oscap'
self.container_name = None
self.image_name = None
+ self.extracted_container = False
# init docker low level api (useful for deep details like container pid)
self.client = self.client_api.api
@@ -56,52 +60,40 @@ def __init__(self, target, is_image=False, oscap_binary='oscap'):
if self.is_image:
self.image_name, self.config = self._get_image_name_and_config(target)
if self.image_name:
- print("Running given image in a temporary container ...")
+ print("Creating a temporary container for the image...")
self.container_name = "tmp_oscap_" + str(uuid.uuid1())
try:
tmp_cont = self.client.create_container(
- self.image_name, 'bash', name=self.container_name, tty=True)
- # tty=True is required in order to keep the container running
- self.client.start(container=tmp_cont.get('Id'))
+ self.image_name, name=self.container_name)
self.config = self.client.inspect_container(self.container_name)
- if int(self.config["State"]["Pid"]) == 0:
- sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
- else:
- self.pid = int(self.config["State"]["Pid"])
except Exception as e:
- sys.stderr.write("Cannot run image {0}.\n".format(self.image_name))
+ sys.stderr.write("Cannot create container for image {0}.\n".format(self.image_name))
raise e
+
+ self._extract_container()
else:
raise ValueError("Image {0} not found.\n".format(target))
else:
self.container_name, self.config = \
self._get_container_name_and_config(target)
+ if not self.container_name:
+ raise ValueError("Container {0} not found.\n".format(target))
# is the container running ?
if int(self.config["State"]["Pid"]) == 0:
- print("Container {0} is stopped, running it temporarily ..."
+ print("Container {0} is stopped"
.format(self.container_name))
- self.client_api.containers.get(self.container_name).start()
- self.container_name, self.config = \
- self._get_container_name_and_config(target)
-
- if int(self.config["State"]["Pid"]) == 0:
- sys.stderr.write(
- "Cannot keep running container {0}, skip it.\n \
- Please start this container before scan it.\n"
- .format(self.container_name))
- else:
- self.stop_at_end = True
-
- # now we are sure that the container is running, get its PID
- self.pid = int(self.config["State"]["Pid"])
+ self._extract_container()
+ else:
+ print("Container {0} is running, using its existing mount..."
+ .format(self.container_name))
+ self.mountpoint = "/proc/{0}/root".format(self.config["State"]["Pid"])
if self._check_container_mountpoint():
- self.mountpoint = "/proc/{0}/root".format(self.pid)
print("Docker container {0} ready to be scanned."
.format(self.container_name))
else:
@@ -113,14 +105,27 @@ def __init__(self, target, is_image=False, oscap_binary='oscap'):
def _end(self):
if self.is_image:
- # stop and remove the temporary container
- self.client.stop(self.container_name)
+ # remove the temporary container
self.client.remove_container(self.container_name)
print("Temporary container {0} cleaned".format(self.container_name))
- else:
- if self.stop_at_end:
- # just stop the container if the tool have started it.
- self.client.stop(self.container_name)
+ if self.extracted_container:
+ print("Cleaning temporary extracted container...")
+ shutil.rmtree(self.mountpoint)
+
+ def _extract_container(self):
+ '''
+ Extracts the container and sets mountpoint to the extracted directory
+ '''
+ with tempfile.TemporaryFile() as tar:
+ for chunk in self.client.export(self.container_name):
+ tar.write(chunk)
+ tar.seek(0)
+ self.mountpoint = tempfile.mkdtemp()
+ self.extracted_container = True
+ with tarfile.open(fileobj=tar) as tf:
+ tf.extractall(path=self.mountpoint)
+ Path(os.path.join(self.mountpoint, '.dockerenv')).touch()
+
def _get_image_name_and_config(self, target):
'''
@@ -159,7 +164,7 @@ def _check_container_mountpoint(self):
'''
Ensure that the container fs is well mounted and return its path
'''
- return os.access("/proc/{0}/root".format(self.pid), os.R_OK)
+ return os.access(self.mountpoint, os.R_OK)
def scan_cve(self, scan_args):
'''
@@ -210,7 +215,7 @@ def scan(self, scan_args):
Wrapper function forwarding oscap args for an offline scan
'''
scan_result = oscap_chroot(
- "/proc/{0}/root".format(self.pid),
+ self.mountpoint,
self.oscap_binary, scan_args,
self.image_name or self.container_name,
self.config["Config"].get("Env", []) or [] # because Env can exists but be None
From 497effd0ba26995ff1211ed794707f023f7122b8 Mon Sep 17 00:00:00 2001
From: Craig Andrews <candrews@integralblue.com>
Date: Tue, 21 Feb 2023 15:35:54 -0500
Subject: [PATCH 5/5] oscap_docker_util.py: use container id instead of
generating a name
---
utils/oscap_docker_python/oscap_docker_util.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/utils/oscap_docker_python/oscap_docker_util.py b/utils/oscap_docker_python/oscap_docker_util.py
index 2ab80595220..0e1dc1dcb6d 100644
--- a/utils/oscap_docker_python/oscap_docker_util.py
+++ b/utils/oscap_docker_python/oscap_docker_util.py
@@ -61,12 +61,12 @@ def __init__(self, target, is_image=False, oscap_binary='oscap'):
self.image_name, self.config = self._get_image_name_and_config(target)
if self.image_name:
print("Creating a temporary container for the image...")
- self.container_name = "tmp_oscap_" + str(uuid.uuid1())
try:
tmp_cont = self.client.create_container(
self.image_name, name=self.container_name)
+ self.container_name = tmp_cont["Id"]
self.config = self.client.inspect_container(self.container_name)
except Exception as e:
sys.stderr.write("Cannot create container for image {0}.\n".format(self.image_name))