diff options
-rw-r--r-- | TODO | 2 | ||||
-rwxr-xr-x | collector.py | 201 | ||||
-rwxr-xr-x | kernel-check.py | 33 | ||||
-rwxr-xr-x | kernellib.py | 203 |
4 files changed, 220 insertions, 219 deletions
@@ -2,13 +2,11 @@ collector ========= - Test units for fuzzing - Add more logging messages -- Rework logging to work with kernel-check and! collector - Check file integrity - Use more telling variables - Clean up code - Implement DTD - Function documentation / manpages -- Move more methods to kernellib.py - Move is_interval to kernellib.py - Implement webserver for kernel-check diff --git a/collector.py b/collector.py index ec634ef..8c3b4fc 100755 --- a/collector.py +++ b/collector.py @@ -16,34 +16,17 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -from __future__ import with_statement -import portage.versions -import xml.etree.cElementTree as et - -import datetime import getopt -import logging as log -import mmap import os -import re import sys import time import kernellib as lib -regex = { - 'bugzilla' : re.compile(r'(?<=bug.cgi\?id=)\d*'), - 'gpatches_v' : re.compile(r'(?<=K_GENPATCHES_VER\=\").+(?=\")'), - 'gpatches_w' : re.compile(r'(?<=K_WANT_GENPATCHES\=\").+(?=\")') -} env = { - 'datefmt' : '%H:%M:%S', 'delay' : 0, 'force' : False, - 'format' : '[%(asctime)s] %(levelname)-6s: %(message)s', - 'filename' : None, - 'level' : log.INFO, 'skip' : False } @@ -83,35 +66,33 @@ def main(argv): if os.access(os.path.dirname(arg) , os.W_OK) and os.path.isdir(arg): env['tree'] = arg elif opt in ('-v', '--verbose'): - env['level'] = log.DEBUG - - log.basicConfig(format = env['format'], datefmt = env['datefmt'], level = env['level'], filename = env['filename']) + lib.verbose = True for directory in envdir: if not os.path.isdir(envdir[directory]): os.makedirs(envdir[directory]) - #print parse_genpatches_list(envdir['tree']) + #print lib.parse_genpatches_list(envdir['tree']) - log.info('Receiving the latest xml file from the nvd...') - log.info(receive_nvd_recent(envdir['nvd'])) + print('Receiving the latest xml file from the nvd...') + lib.receive_nvd_recent(envdir['nvd'], env['force']) if not env['skip']: - log.info('Receiving earlier xml files from the nvd...') - receive_nvd_all(envdir['nvd']) + print('Receiving earlier xml files from the nvd...') + lib.receive_nvd_all(envdir['nvd'], env['force']) - log.info('Creating the nvd dictionary...') - nvd_dict = parse_nvd_dict(envdir['nvd']) + print('Creating the nvd dictionary...') + nvd_dict = lib.parse_nvd_dict(envdir['nvd']) - log.info('Receiving the kernel bug list from bugzilla...') - log.info(receive_bugzilla_list(envdir['temp'])) + print('Receiving the kernel bug list from bugzilla...') + lib.receive_bugzilla_list(envdir['temp'], env['force']) - log.info('Creating the xml files...') - buglist = parse_bugzilla_list(os.path.join(envdir['temp'], 'list.xml')) + print('Creating the xml files...') + buglist = lib.parse_bugzilla_list(os.path.join(envdir['temp'], 'list.xml')) for item in buglist: - log.debug(receive_bugzilla_bug(envdir['bug'], item)) - bug_dict = parse_bugzilla_dict(envdir['bug'], item) + lib.receive_bugzilla_bug(envdir['bug'], item, env['force']) + bug_dict = lib.parse_bugzilla_dict(envdir['bug'], item) lib.write_cve_file(envdir['out'], item, bug_dict, nvd_dict) time.sleep(env['delay']) @@ -131,159 +112,5 @@ def usage(): sys.exit() -def receive_nvd_recent(directory): - 'Download the latest CVEs file from the National Vulnerability Database' - - path = 'http://nvd.nist.gov/download/' - - return lib.receive_file(directory, path, 'nvdcve-recent.xml', env['force']) - - -def receive_nvd_all(directory): - 'Download all earlier CVEs files from the National Vulnerability Database' - - path = 'http://nvd.nist.gov/download/' - year = datetime.datetime.now().year - - if year < 2002 or year > 2020: - year = 2020 - - for i in xrange(2002, year + 1): - log.info(lib.receive_file(directory, path, 'nvdcve-' + str(i) + '.xml', env['force'], max_age = datetime.timedelta(1))) - - -def receive_bugzilla_list(directory): - 'Download a list containing all Bugzilla kernel bugs' - - status = ['NEW', 'ASSIGNED', 'REOPENED', 'RESOLVED', 'VERIFIED', 'CLOSED'] - resolution = ['FIXED', 'LATER', 'CANTFIX', 'TEST-REQUEST', 'UPSTREAM', '---'] - - path = ['https://bugs.gentoo.org/buglist.cgi?query_format=advanced&component=Kernel'] - for i in status: - path.append('&bug_status=' + i) - for i in resolution: - path.append('&resolution=' + i) - path.append('#') - - return lib.receive_file(directory, ''.join(path), 'list.xml', env['force']) - - -def receive_bugzilla_bug(directory, bugid): - 'Download the xml file of a particular Bugzilla kernel bug' - - path = 'https://bugs.gentoo.org/show_bug.cgi?ctype=xml&id=' - - return lib.receive_file(directory, path, bugid, env['force']) - - -def parse_genpatches_list(directory): - 'Returns a list containing all genpatches' - - genpatches = list() - directory = os.path.join(directory, 'sys-kernel') - - for sources in os.listdir(directory): - if '-sources' in sources: - - for ebuild in os.listdir(os.path.join(directory, sources)): - if '.ebuild' in ebuild: - - pkg = portage.versions.catpkgsplit('sys-kernel/' + ebuild[:-7]) - - with open(os.path.join(directory, sources, ebuild), 'r') as ebuild_file: - content = ebuild_file.read() - - try: - genpatch_v = regex['gpatches_v'].findall(content)[0] - genpatch_w = regex['gpatches_w'].findall(content)[0] - except: - break - - genpatch = [pkg[1], pkg[2] + '_' + pkg[3] if pkg[3] != 'r0' else pkg[2], pkg[2] + '-' + genpatch_v, genpatch_w] - genpatches.append(genpatch) - - return genpatches - - -def parse_bugzilla_list(filename): - 'Returns a list containing all bugzilla kernel bugs' - - with open(filename, 'r+') as buglist_file: - memory_map = mmap.mmap(buglist_file.fileno(), 0) - - buglist = regex['bugzilla'].findall(memory_map.read(-1)) - log.info(str(len(buglist)) + ' bugs found') - - return buglist - - -def parse_bugzilla_dict(directory, bugid): - 'Returns a dictionary containing information about a kernel bug' - - bugfilename = os.path.join(directory, bugid) - root = et.parse(open(bugfilename, 'r')).getroot()[0] - - elements = ['bug_id', 'creation_ts', 'reporter', 'status_whiteboard', 'short_desc', 'rep_platform'] - dic = dict() - - for i in elements: - if i == 'short_desc': - cves = lib.extract_cves(root.find(i).text) - if len(cves) > 0: - dic['cves'] = cves - else: - log.error('Invalid cve for bugid [%s]' % root.find('bug_id').text) - log.error('-> ' + root.find(i).text) - try: - dic[i] = root.find(i).text - except AttributeError: - dic[i] = None - - return dic - - -def parse_nvd_dict(directory): - 'Returns a dictionary containing all CVEs from the National Vulnerability Database' - - namespace = '{http://nvd.nist.gov/feeds/cve/1.2}' - main = dict() - cve = str() - - for nvdfile in os.listdir(directory): - nvdfilename = os.path.join(directory, nvdfile) - - with open(nvdfilename, 'r+') as xml_data: - memory_map = mmap.mmap(xml_data.fileno(), 0) - root = et.parse(memory_map).getroot() - - elements = ['CVSS_vector', 'CVSS_score', 'name', 'severity', 'published'] - - for i, tree in enumerate(root): - dic = dict() - url = list() - - for j in elements: - if j == 'name': - cve = tree.get(j) - else: - dic[j] = tree.get(j) - - reftree = tree.find(namespace + 'refs') - reftree.tag = reftree.tag.replace(namespace,'') - for elem in reftree.findall('.//*'): - elem.tag = elem.tag.replace(namespace,'') - dic['refs'] = reftree - - desc = tree.find(''.join(namespace + tag + '/' for tag in ('desc', 'descript'))) - if desc != None: - dic['desc'] = desc.text - else: - dic['desc'] = '' - - main[cve] = dic - - return main - - if __name__ == '__main__': main(sys.argv[1:]) diff --git a/kernel-check.py b/kernel-check.py index 19bfa54..22cc5e8 100755 --- a/kernel-check.py +++ b/kernel-check.py @@ -35,15 +35,15 @@ def main(argv): 'Main function' data = { - 'kernel' : kernel_version(), + 'kernel' : lib.kernel_version(), 'genpatch' : None, - 'arch' : commands.getstatusoutput('uname -m')[1], #FIXME: cat /proc/? + 'arch' : commands.getstatusoutput('uname -m')[1], #FIXME: cat /proc/? + move to lib 'cve' : [345, 284, 274, 0, 4], #TODO: Implement 'latest' : '2.6.27-r13 (gentoo-sources)' #TODO: Implement } try: - opts, args = getopt.getopt(argv, 'ahnr:s:v', ['about', 'help', 'nocolor', 'report=', 'show=']) + opts, args = getopt.getopt(argv, 'ahnr:s:v', ['about', 'help', 'nocolor', 'report=', 'show=', 'verbose']) except getopt.GetoptError: usage() @@ -61,6 +61,8 @@ def main(argv): elif opt in ('-s', '--show'): return # TODO: show_bugid(arg) + elif opt in ('-v', '--verbose'): + lib.verbose = True eout = portage.output.EOutput() color = portage.output.colorize @@ -128,11 +130,12 @@ def usage(): print 'kernel-check.py: Kernel security information\r\n' print 'Usage:' - print ' -a --about : display information about this tool' - print ' -h --help : display help information' - print ' -n --nocolor : disable colors' - print ' -r --report [file] : create a security report' - print ' -s --show [bugid] : display information about a bug' + print ' -a --about : display information about this tool' + print ' -h --help : display help information' + print ' -n --nocolor : disable colors' + print ' -r --report [file] : create a security report' + print ' -s --show [bugid] : display information about a bug' + print ' -v --verbose : display debugging information' sys.exit() def about(): @@ -146,19 +149,5 @@ def about(): sys.exit() -def kernel_version(): - 'Provides the kernel version as dictionary' - - try: - with open('/proc/sys/kernel/osrelease', 'r') as proc: - osrelease = proc.read()[:-1] - return lib.extract_version_from(osrelease) - except IOError, e: - print 'Could not read version from /proc/sys/kernel/osrelease: %s' % str(e) - return None - - return None - - if __name__ == '__main__': main(sys.argv[1:]) diff --git a/kernellib.py b/kernellib.py index 2903082..629bfdc 100755 --- a/kernellib.py +++ b/kernellib.py @@ -22,24 +22,41 @@ import xml.etree.cElementTree as et import cStringIO import datetime -import logging as log +import logging +import mmap import os import re import urllib + #TODO wb_match is too long (?) regex = { + 'bugzilla' : re.compile(r'(?<=bug.cgi\?id=)\d*'), + 'gpatches_v' : re.compile(r'(?<=K_GENPATCHES_VER\=\").+(?=\")'), + 'gpatches_w' : re.compile(r'(?<=K_WANT_GENPATCHES\=\").+(?=\")'), + 'groupall' : re.compile(r'[ (]*CVE-(\d{4})([-,(){}|, \d]+)'), 'groupsplit' : re.compile(r'(?<=\D)(\d{4})(?=\D|$)'), 'wb_match' : re.compile(r'\s*\[\s*([^ +<=>]+)\s*(\+?)\s*([<=>]{1,2})\s*([^ <=>\]]+)\s*(?:([<=>]{1,2})\s*([^ \]]+))?\s*\]\s*(.*)'), 'wb_version' : re.compile(r'^(?:\d{1,2}\.){0,3}\d{1,2}(?:[-_](?:r|rc)?\d{1,2})*$'), - + 'version' : re.compile(r'^((?:\d{1,2}\.){0,3}\d{1,2})(-.*)?$'), 'rcd' : re.compile(r'^rc\d{1,3}$'), 'gitd' : re.compile(r'^git(\d{1,3})$'), - 'rd' : re.compile(r'^r\d{1,3}$'), + 'rd' : re.compile(r'^r\d{1,3}$') } +verbose = False +logging.basicConfig(format='%(levelname)-6s[%(asctime)s] : %(message)s', datefmt='%H:%M:%S', level=logging.DEBUG, filename=None) + +def debug(msg): + if verbose: + logging.debug(msg) + + +def error(msg): + logging.error(msg) + def receive_file(directory, path, xml_file, force, max_age = datetime.timedelta(0, 59*60)): 'Generic download function' @@ -50,7 +67,8 @@ def receive_file(directory, path, xml_file, force, max_age = datetime.timedelta( if os.path.exists(filename): age = datetime.datetime.now() - datetime.datetime.fromtimestamp(os.path.getmtime(filename)) if age < max_age: - return 'File %s - %sKB is recent enough [%s]' % (filename, os.path.getsize(filename)/1024, str(age)[:-7]) + debug('File %s - %sKB is recent enough [%s]' % (filename, os.path.getsize(filename)/1024, str(age)[:-7])) + return with closing(cStringIO.StringIO()) as data: with closing(urllib.urlopen(path + xml_file)) as resource: @@ -59,7 +77,163 @@ def receive_file(directory, path, xml_file, force, max_age = datetime.timedelta( with open(filename, 'w') as output: output.write(data.getvalue()) - return 'File %s - %sKB received' % (filename, os.path.getsize(filename)/1024) + debug('File %s - %sKB received' % (filename, os.path.getsize(filename)/1024)) + + +def receive_nvd_recent(directory, force): + 'Download the latest CVEs file from the National Vulnerability Database' + + path = 'http://nvd.nist.gov/download/' + + receive_file(directory, path, 'nvdcve-recent.xml', force) + + +def receive_nvd_all(directory, force): + 'Download all earlier CVEs files from the National Vulnerability Database' + + path = 'http://nvd.nist.gov/download/' + year = datetime.datetime.now().year + + if year < 2002 or year > 2020: + year = 2020 + + for i in xrange(2002, year + 1): + receive_file(directory, path, 'nvdcve-' + str(i) + '.xml', force, max_age = datetime.timedelta(1)) + + +def receive_bugzilla_list(directory, force): + 'Download a list containing all Bugzilla kernel bugs' + + status = ['NEW', 'ASSIGNED', 'REOPENED', 'RESOLVED', 'VERIFIED', 'CLOSED'] + resolution = ['FIXED', 'LATER', 'CANTFIX', 'TEST-REQUEST', 'UPSTREAM', '---'] + + path = ['https://bugs.gentoo.org/buglist.cgi?query_format=advanced&component=Kernel'] + for i in status: + path.append('&bug_status=' + i) + for i in resolution: + path.append('&resolution=' + i) + path.append('#') + + receive_file(directory, ''.join(path), 'list.xml', force) + + +def receive_bugzilla_bug(directory, bugid, force): + 'Download the xml file of a particular Bugzilla kernel bug' + + path = 'https://bugs.gentoo.org/show_bug.cgi?ctype=xml&id=' + + receive_file(directory, path, bugid, force) + + + +def parse_genpatches_list(directory): + 'Returns a list containing all genpatches' + + genpatches = list() + directory = os.path.join(directory, 'sys-kernel') + + for sources in os.listdir(directory): + if '-sources' in sources: + + for ebuild in os.listdir(os.path.join(directory, sources)): + if '.ebuild' in ebuild: + + pkg = portage.versions.catpkgsplit('sys-kernel/' + ebuild[:-7]) + + with open(os.path.join(directory, sources, ebuild), 'r') as ebuild_file: + content = ebuild_file.read() + + try: + genpatch_v = regex['gpatches_v'].findall(content)[0] + genpatch_w = regex['gpatches_w'].findall(content)[0] + except: + break + + genpatch = [pkg[1], pkg[2] + '_' + pkg[3] if pkg[3] != 'r0' else pkg[2], pkg[2] + '-' + genpatch_v, genpatch_w] + genpatches.append(genpatch) + + return genpatches + + +def parse_bugzilla_list(filename): + 'Returns a list containing all bugzilla kernel bugs' + + with open(filename, 'r+') as buglist_file: + memory_map = mmap.mmap(buglist_file.fileno(), 0) + + buglist = regex['bugzilla'].findall(memory_map.read(-1)) + debug(str(len(buglist)) + ' bugs found') + + return buglist + + +def parse_bugzilla_dict(directory, bugid): + 'Returns a dictionary containing information about a kernel bug' + + bugfilename = os.path.join(directory, bugid) + root = et.parse(open(bugfilename, 'r')).getroot()[0] + + elements = ['bug_id', 'creation_ts', 'reporter', 'status_whiteboard', 'short_desc', 'rep_platform'] + dic = dict() + + for i in elements: + if i == 'short_desc': + cves = extract_cves(root.find(i).text) + if len(cves) > 0: + dic['cves'] = cves + else: + error('Invalid cve for bugid [%s]' % root.find('bug_id').text) + error('-> ' + root.find(i).text) + try: + dic[i] = root.find(i).text + except AttributeError: + dic[i] = None + + return dic + + +def parse_nvd_dict(directory): + 'Returns a dictionary containing all CVEs from the National Vulnerability Database' + + namespace = '{http://nvd.nist.gov/feeds/cve/1.2}' + main = dict() + cve = str() + + for nvdfile in os.listdir(directory): + nvdfilename = os.path.join(directory, nvdfile) + + with open(nvdfilename, 'r+') as xml_data: + memory_map = mmap.mmap(xml_data.fileno(), 0) + root = et.parse(memory_map).getroot() + + elements = ['CVSS_vector', 'CVSS_score', 'name', 'severity', 'published'] + + for i, tree in enumerate(root): + dic = dict() + url = list() + + for j in elements: + if j == 'name': + cve = tree.get(j) + else: + dic[j] = tree.get(j) + + reftree = tree.find(namespace + 'refs') + reftree.tag = reftree.tag.replace(namespace,'') + for elem in reftree.findall('.//*'): + elem.tag = elem.tag.replace(namespace,'') + dic['refs'] = reftree + + desc = tree.find(''.join(namespace + tag + '/' for tag in ('desc', 'descript'))) + if desc != None: + dic['desc'] = desc.text + else: + dic['desc'] = '' + + main[cve] = dic + + return main + def indent(node, level=0): 'Indents xml layout for printing' @@ -128,8 +302,8 @@ def write_cve_file(directory, bugid, bug_dict, nvd_dict): for item in intervals: item.to_xml(affectedroot) else: - log.error('Whiteboard for bugid [%s]' % bug_dict['bug_id']) - log.error('-> %s' % bug_dict['status_whiteboard']) + error('Whiteboard for bugid [%s]' % bug_dict['bug_id']) + error('-> %s' % bug_dict['status_whiteboard']) else: node = et.SubElement(bugroot, element) node.text = xml_bug_dict[element] @@ -305,6 +479,19 @@ def from_whiteboard(whiteboard): return affected +def kernel_version(): + 'Provides the kernel version as dictionary' + + try: + with open('/proc/sys/kernel/osrelease', 'r') as proc: + osrelease = proc.read()[:-1] + return extract_version_from(osrelease) + except IOError, e: + error('Could not read version from /proc/sys/kernel/osrelease: %s' % str(e)) + return None + + return None + def extract_version_from(release): #TODO: Short Description @@ -337,7 +524,7 @@ def extract_version_from(release): elif elem in kernel_types: source = elem elif elem != '': - print 'Dropping unknown version component \'%s\', probably local tag.' % elem + error('Dropping unknown version component \'%s\', probably local tag.' % elem) kernel = { 'revision' : revision, |