# Copyright 2022 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import sys from multiprocessing import Pool, cpu_count import re import io import json import os import argparse def getJsonFromFile(path, phase): with open(path) as f: try: JsonList = json.load(f) except json.decoder.JSONDecodeError as e: print(f"{e} in file: {path}") return [] return JsonList[phase] def get_pattern_dict(catchissue, i, uuid): #print(catchissue) patten_dict = {} patten_dict['line'] = i patten_dict['uuid'] = uuid patten_dict['string'] = catchissue[0] patten_dict['start'] = catchissue[1] patten_dict['end'] = catchissue[2] patten_dict['status'] = catchissue[3] patten_dict['search_type'] = catchissue[4] patten_dict['url'] = catchissue[6] patten_dict['description'] = catchissue[7] return patten_dict def addPatternToList(phaseList, log_search_patterns, uuid): for phase in phaseList: if uuid == '00000000-0000-0000-000000000000': path = 'LogPattern' else: path = os.path.join('Project', uuid, 'LogPattern') CatchIssueFile = os.path.join(path, 'CatchIssues' + phase + '.json') CatchIssueList = getJsonFromFile(CatchIssueFile, phase) i = 3 for catchissue in CatchIssueList: search_pattern = get_pattern_dict(catchissue, i, uuid) try: re.compile(search_pattern['string']) except re.error: print(f"Non valid regex pattern in line: {str(search_pattern['line'])} String: {search_pattern['string']} Project: {search_pattern['uuid']} Phase: {phase}") else: log_search_patterns[phase].append(search_pattern) i = i + 1 return log_search_patterns def get_log_search_patterns(uuid): path = os.path.join('LogPattern', 'PhaseList.json') PhaseList = getJsonFromFile(path,'PhaseList') # get pattern from the projects and add that to log_search_pattern log_search_patterns = {} for phase in PhaseList: log_search_patterns[phase] = [] uuid_default = '00000000-0000-0000-000000000000' log_search_patterns = addPatternToList(PhaseList, log_search_patterns, uuid_default) #log_search_pattern = addPatternToList(PhaseList, log_search_pattern, uuid) return log_search_patterns def get_search_pattern_match(search_pattern, text_line): #print(f"Text: {text_line}") if search_pattern['search_type'] == 'search': if re.search(search_pattern['string'], text_line): #print(f"Match string: {search_pattern['string']} Type: {search_pattern['search_type']}") return True elif search_pattern['search_type'] == 'startswith': if text_line.startswith(search_pattern['string']): #print(f"Match string: {search_pattern['string']} Type: {search_pattern['search_type']}") return True elif search_pattern['search_type'] == 'endswith': if text_line.endswith(search_pattern['string']): #print(f"Match string: {search_pattern['string']} Type: {search_pattern['search_type']}") return True elif search_pattern['search_type'] == 'in': if search_pattern['string'] in text_line: #print(f"Match string: {search_pattern['string']} Type: {search_pattern['search_type']}") return True else: return False def search_buildlog(log_search_patterns, text_line, index): summary = {} summary[index] = { 'text' : text_line, 'pattern_info' : [], } for phase, search_patterns in log_search_patterns.items(): for search_pattern in search_patterns: match = get_search_pattern_match(search_pattern, text_line) if phase == 'Ignore' and match: return False elif phase != 'Ignore' and match: summary[index]['pattern_info'].append({ 'search_type' : search_pattern['search_type'], 'status' : search_pattern['status'], 'line' : search_pattern['line'], 'search_pattern' : search_pattern['string'], 'phase' : phase, 'uuid' : search_pattern['uuid'], 'url' : search_pattern['url'], 'description' : search_pattern['description'], }) if summary[index]['pattern_info'] != []: #print(f"summary: {summary}") return summary return False def getMultiprocessingPool(args): return Pool(processes = int(args.cpu)) def getJsonResult(results): for r in results: try: value = r.get() except Exception as e: print(f'Failed with: {e}') else: if value: print(json.dumps(value), flush=True) def runLogParser(args): index = 1 logfile_text_dict = {} summary = {} #NOTE: The patten is from https://github.com/toralf/tinderbox/tree/master/data files. # Is stored in json files # make dict with it log_search_patterns = get_log_search_patterns(args.uuid) # read the log file to dict with open(args.file, encoding='utf8', errors='ignore') as f: for text_line in f: logfile_text_dict[index] = text_line.strip('\n') index = index + 1 #for text_line in io.TextIOWrapper(io.BufferedReader(open(args.file)), encoding='utf8', errors='ignore'): # logfile_text_dict[index] = text_line.strip('\n') # index = index + 1 # run the search parse pattern on the text lines #params = [(log_search_pattern, text, line_index,) for line_index, text in logfile_text_dict.items()] if not args.debug: with getMultiprocessingPool(args) as pool: results = list(pool.apply_async(search_buildlog, args=(log_search_patterns, text, line_index,)) for line_index, text in logfile_text_dict.items()) getJsonResult(results) pool.close() pool.join() else: results = [] for line_index, text in logfile_text_dict.items(): results.append(search_buildlog(log_search_pattern, text, line_index)) #FIXME: Json output #getJsonResult(results) def main(): # get filename, project_uuid default_project_uuid parser = argparse.ArgumentParser() parser.add_argument("-f", "--file", required=True) parser.add_argument("-u", "--uuid", required=True) parser.add_argument("-c", "--cpu", required=True) #FIXME: add If args.debug .... wear is needed parser.add_argument("-d", "--debug", action="store_true", required=False) args = parser.parse_args() runLogParser(args) sys.exit() if __name__ == "__main__": main()