算法日志的存在核心在于搭建自检系统

发布时间:2024-01-05 11:48:38

? ? ? ? 相信每一个人执行与日志有关的任务都会遇到这样难题吧?长达几万行的日志,如果我们单纯用肉眼去一个个排查,那么恐怕所耗费的时间是以天为计量单位了。当然这是一种比较夸张的情况,根据我的项目经验,正常情况是十几个站点的人可能每天需要花费5-6个小时去排查日志或者与日志有关却能被日志替代的内容。如果我们能搭建一个智能化的系统,使得这个系统可以智能的读取日志中我们关键的信息,那么会发生什么呢?

? ? ? ? 有些人问,我就想用肉眼看,不行嘛?其实,不是肉眼看不起,而是智能化日志更有性价比!没错,如果我们搭建这样一个智能化日志自检系统,十几个站每个团体每天都能节省5-6个工时去干别的事情。

NOTE:本文只是介绍一种思想,所以不会有过多的具体代码讲解,但是可以给上一个成功的案例手册,仅供参考。

现场人员自检失败表计点位教程

NOTE:?如果没有meterPoint_Self-Checking_sys.py“脚本的请联系我们进行提供

👇

运行该脚本,参考运行命令如下(请确保此时您的工作目录处于meter/log)

#这是一条参考运行命令,请您根据您实际的情况修改-p和-t参数的具体内容
python3?meterPoint_Self-Checking_sys.py?-p?meterlog?-t?30M00000036658634_task1703485183168_20231225141946
#?@pararm:-p?是存放日志的路径,该日志包含您刚跑完测试的日志内容。
#?@pararm:-t?是您任务的序号,如下图,Ftp图片路径下包含”task“的字符串,也就是灰色框框住的那一串正式您此次任务的序号,输入30M00000036658634_task1703485183168_20231225141946

👇

自动生成自检报表meterlog_checking.txt

里面部分关键内容如下:

👇

接下来大家请对照这张表,找到【需要现场人员自检】【错误】进行搜索排查,有多个,可以从上往下慢慢来。

👇

以【通用类】<序号7>"该点位没有录入"作为例子,打开自检文本meterlog_checking.txt

👇

👇

如果出现无需现场人员自检的错误,需要截图一下日志中有关内容,可能后续还需提供图片我们这边进行优化

一些使用样例图:?

# -*- coding: utf-8 -*-
'''
参考diamagnetic:
# 兰江
python3 meterPoint_Self-Checking_sys.py -p meterlog -t 30M00000036658634_task1703485183168_20231225141946
# 金鼎
python3 meterPoint_Self-Checking_sys.py -p meterlog -t 30M00000036658634_task1703485183168_20231225141947
'''
import re
import json
import argparse

# 创建命令行参数解析器
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--log_file', help='log文件路径')
parser.add_argument('-t', '--task_id', help='任务ID')
args = parser.parse_args()

def extract_debug_segments(log_file):
    debug_segments = []
    with open(log_file, 'r') as file:
        lines = file.readlines()
        start_line = None
        end_line = None
        segment = []

        for i, line in enumerate(lines):
            if 'Debug' in line or '收到请求' in line or '数据库信息' in line:
                if start_line is None:
                    start_line = i
                segment.append(line.strip())   

            elif '结果放入队列待发送' in line:
                if start_line is not None:
                    end_line = i
                    segment.append(line)
                    debug_segments.append([segment, start_line, end_line])
                    segment = []
                    start_line = None
                    end_line = None

    return debug_segments
def process_request(request_str):
    target_index = request_str.index("{")
    # 按照":"分割字符串
    split_str = request_str[target_index:]
    # 获取分割后数组中最后一个索引所保存的信息
    json_str = split_str.strip().replace("—", "-").replace("'", "\"")

    objectList_request_str = json.loads(json_str)['objectList'][0]
    # for k in objectList_request_str:
    #     print(k)
    return objectList_request_str

def get_pointList_length(json_str):
    
    pattern = r"'Position': '(\[.*?\])'"
    matches = re.search(pattern, json_str)
    if matches is None:
        return 0
    position_list = json.loads(matches.group(1))
    # print("position_list:", position_list)
    return len(position_list)

def process_sql(json_str):

    json_str = json_str[json_str.index("MinValue"):]
    json_str = "{'" + json_str
    json_str = json_str.replace("'", "\"")
    sql_dict = json.loads(json_str)
    return sql_dict

def process_result(json_str):

    json_str = json_str[json_str.index("code"):-5]
    json_str = "{'" + json_str
    # print(json_str)
    json_str = json_str.replace("'", "\"")
    json_str = json_str.replace("None", "null")
    sql_dict = json.loads(json_str)
    return sql_dict

def contains_digit(string):
    pattern = r'\d'  # 正则表达式模式,匹配任意数字
    if re.search(pattern, string):
        return True
    else:
        return False

if __name__ == "__main__":
    # 摄像机偏移严重+模糊
    Error_withoutDetctor = []
    # 未识别出指针
    Error_withoutPointer = []
    # 读取ftp图失败
    Error_loadftp = []
    # minIO无图
    Error_withoutMinioImage = []
    # minIO错图
    Error_minioErrorImage = []
    # 点位未录入
    Error_withoutId = []
    # 表计类型录入错误
    Error_clsType = []
    # 最大最小值设置错误
    Error_minMaxSet = []
    # 最大最小值未设置
    Error_withoutMinMax = []
    # 未打刻度点位
    Error_withoutPointList = []
    # 刻度打点错误
    Error_PointList = []
    # 未识别到任何油面表!
    Error_ymb = []
    # OCR没有检测出数字
    Error_ocrRec = []
    # OCR没有检测出表盘
    Error_ocrDet = []
    # ===========================核
    # 获取命令行参数
    log_file = args.log_file
    work_id = args.task_id
    debug_segments = extract_debug_segments(log_file)
    error_num = 0
    # not_reading_num = 0
    # type_num = 0
    ymb_num, sxb_num, bj_num = 0, 0, 0
    ymb_errorNum, sxb_errorNum, bj_errorNum = 0, 0, 0
    # 过滤一遍只剩下最新的
    filter_schem = {}
    not_del_ids = []
    iter_index = 0
    for idx, segment in enumerate(debug_segments):
        strat_line = segment[1]
        end_line = segment[2]
        for line in segment[0]:
            if "收到请求" in line:
                # print('【请求信息】: ',end='')
                objectList_request_str = process_request(line)
                extract_objectId = objectList_request_str['objectId']
                if not extract_objectId in filter_schem.keys():
                    # 新增
                    filter_schem[extract_objectId] = iter_index
                else:
                    # 更新
                    filter_schem[extract_objectId] = iter_index
                    not_del_ids.append(iter_index)

                iter_index += 1
                break
    # print(not_del_ids)
    # ------------------过滤结束
    for idx, segment in enumerate(debug_segments):
        error_flag = False
        # print('Start Line:', segment[1])
        # print('End Line:', segment[2])
        for line in segment[0]:
            # print(line)
            if "收到请求" in line:
                # print('【请求信息】: ',end='')
                objectList_request_str = process_request(line)
                extract_objectId = objectList_request_str['objectId']
                # print(objectList_request_str['imageUrlList'][0], work_id)
                if not work_id in objectList_request_str['imageUrlList'][0]:
                    break
                if not filter_schem[extract_objectId] == idx:
                    break

            elif '数据库信息' in line:
                if line.split("【数据库信息】")[-1] == '{}':
                    # 数据库信息为空
                    # print('*pointList_length:0')
                    # print('{}')
                    Error_withoutId.append(extract_objectId)
                    error_flag = True
                    break
                else:
                    # 数据库有信息
                    pointList_length = get_pointList_length(line)
                    sql_schem = process_sql(line)
                    MinValue = sql_schem['MinValue']
                    MaxValue = sql_schem['MaxValue']
                    meter_type = sql_schem['AlgorithmType']
                    ImagePath = sql_schem['ImagePath']

                    if meter_type == 'meter_v5':
                        bj_num += 1
                    if meter_type == 'meter_ywj':
                        ymb_num += 1
                    if meter_type == 'paddleocr':
                        sxb_num += 1

                    if meter_type == 'meter_v5':
                        if len(MinValue)== 0 or len(MaxValue) == 0:
                            Error_withoutMinMax.append(extract_objectId)
                            MinValue = float(0)
                            MaxValue = float(100)
                            error_flag = True
                        else:
                            MinValue = float(MinValue)
                            MaxValue = float(MaxValue)

                    # 表计类型录入错误(如果打点了,但表计类型不是meter_v5)
                    if meter_type == 'meter_v5' and pointList_length == 0:
                        Error_clsType.append(extract_objectId)
                        error_flag = True
                    # 未打刻度点位
                    if meter_type == 'meter_v5' and pointList_length == 0:
                        Error_withoutPointList.append(extract_objectId)
                        error_flag = True

                    # print(sql_schem, end=',')
                    # print("*pointList_length:", pointList_length)
            
            elif '结果放入队列待发送' in line:
                result_schem = process_result(line)
                # print('【结果队列信息】:',end='')
                # print(result_schem)
                if result_schem['code'] == '2001':
                    Error_loadftp.append(extract_objectId)
                    error_flag = True
                if result_schem['desc'] == '未识别到任何油面表!':
                    error_flag = True
                    Error_ymb.append(extract_objectId)
            else:
                splitContent = line.split("【Debug】")[-1]
                if "成功检测到表盘!表盘信息是" in splitContent:
                    det_clsType = splitContent.split(":")[-1].strip().strip("").strip("[]").strip()
                    if splitContent.split(":")[-1].strip().strip("") == "[]":
                        Error_withoutDetctor.append(extract_objectId)
                        error_flag = True

                    if not 'sxb' in det_clsType and meter_type == 'paddleocr':
                        Error_ocrDet.append(extract_objectId) 
                        error_flag = True

                    if 'ywb' in det_clsType:
                        ywb_minMax = [
                            [-20, 140],
                            [0, 160]
                        ]
                        iter_minMax = [MinValue, MaxValue]
                        if not iter_minMax in ywb_minMax:
                            Error_minMaxSet.append(extract_objectId)
                            error_flag = True
                            
                    elif 'xldlb' in det_clsType:
                        xldlb_minMax = [
                            [0, 3.0],
                            [0, 10],
                            [0, 9],
                            [0, 1]
                        ]
                        iter_minMax = [MinValue, MaxValue]
                        if not iter_minMax in xldlb_minMax:
                            Error_minMaxSet.append(extract_objectId)
                            error_flag = True
                # if '动作次数' in splitContent:
                #     print(splitContent)
                # if '泄漏电流值' in splitContent:
                #     print(splitContent)
                if 'OCR没有检测出数字' in splitContent:
                    Error_ocrRec.append(extract_objectId)
                    error_flag = True

                if "没识别出指针" in splitContent:
                    Error_withoutPointer.append(extract_objectId)
                    error_flag = True

                if len(ImagePath) == 0 or "MinIo中缺失该点位基准图" in splitContent:
                    Error_withoutMinioImage.append(extract_objectId)
                    error_flag = True

                # 用于验证
                if '读数结果' in splitContent and not contains_digit(splitContent):
                    # not_reading_num +=1
                    # 验证后 无读数个数和错误个数基本一致->代表验证成功
                    # print(not_reading_num)
                    continue
        if error_flag:
            if meter_type == 'meter_v5':
                bj_errorNum += 1
            if meter_type == 'meter_ywj':
                ymb_errorNum += 1
            if meter_type == 'paddleocr':
                sxb_errorNum += 1
            error_num += 1
    print("错误总数比:【{}/{}】".format(error_num,len(debug_segments)-len(not_del_ids)))
    # ===========================核
    # 写入
    with open('meterLog_checking.txt', 'w') as output_file:
        output_file.write('您这次序号为[{}]的任务:\n---------------------------------\n一共测试表计数量:[{}]个, 错误点位为:[{}]个, 未打点个数为:[{}]。\n<在此之中>\n,指针类表计成功占[{}/{}]个\n,油面表成功占[{}/{}]个\n,数显表成功占[{}/{}]个。'.format(work_id,len(debug_segments)-len(not_del_ids),error_num,len(Error_withoutId),bj_num - bj_errorNum, bj_num,ymb_num - ymb_errorNum, ymb_num, sxb_num - sxb_errorNum, sxb_num))

        output_file.write('\n')
        output_file.write('---------------------------------\n')
        output_file.write('NOTE:接下来,请您根据所需要查询的错误名称,使用<ctrl+F>的方式进行查询。\n')
        output_file.write('---------------------------------\n')

        output_file.write("【错误】可能存在摄像机偏移严重/模糊<数量:{}>:".format(str(len(set(Error_withoutDetctor)))) + "\n")
        output_file.write("\n".join(set(Error_withoutDetctor)))
        output_file.write('\n')

        output_file.write("【错误】未识别出指针<数量:{}>:".format(str(len(set(Error_withoutPointer)))) + "\n") 
        output_file.write("\n".join(set(Error_withoutPointer)))
        output_file.write('\n')

        output_file.write("【错误】读取ftp图失败<数量:{}>:".format(str(len(set(Error_loadftp)))) + "\n")
        output_file.write("\n".join(set(Error_loadftp)))
        output_file.write('\n')

        output_file.write("【错误】minIO无图<数量:{}>:".format(str(len(set(Error_withoutMinioImage)))) + "\n")
        output_file.write("\n".join(set(Error_withoutMinioImage)))
        output_file.write('\n')

        output_file.write("【错误】该点位没有录入<数量:{}>:".format(str(len(set(Error_withoutId)))) + "\n")
        output_file.write("\n".join(set(Error_withoutId)))
        output_file.write('\n')

        output_file.write("【错误】表计类型录入错误<数量:{}>:".format(str(len(set(Error_clsType)))) + "\n")
        output_file.write("\n".join(set(Error_clsType)))
        output_file.write('\n')

        output_file.write("【错误】最大最小值未设置<数量:{}>:".format(str(len(set(Error_withoutMinMax)))) + "\n")
        output_file.write("\n".join(set(Error_withoutMinMax)))
        output_file.write('\n')

        output_file.write("【错误】未打刻度点位<数量:{}>:".format(str(len(set(Error_withoutPointList)))) + "\n")
        output_file.write("\n".join(set(Error_withoutPointList)))
        output_file.write('\n')

        output_file.write("【错误】最大最小值设置错误<数量:{}>:".format(str(len(set(Error_minMaxSet)))) + "\n")
        output_file.write("\n".join(set(Error_minMaxSet)))
        output_file.write('\n')

        output_file.write("【错误】存在刻度打点错误(暂未启用)<数量:{}>:".format(str(len(set(Error_PointList)))) + "\n")
        output_file.write("\n".join(set(Error_PointList)))
        output_file.write('\n')

        output_file.write("【错误】未识别到任何油面表<数量:{}>:".format(str(len(set(Error_ymb)))) + "\n")
        output_file.write("\n".join(set(Error_ymb)))
        output_file.write('\n')

        output_file.write("【错误】OCR没有检测出数字<数量:{}>:".format(str(len(set(Error_ocrRec)))) + "\n")
        output_file.write("\n".join(set(Error_ocrRec)))
        output_file.write('\n')

        output_file.write("【错误】OCR没有检测出表盘<数量:{}>:".format(str(len(set(Error_ocrDet)))) + "\n")
        output_file.write("\n".join(set(Error_ocrDet)))
        output_file.write('\n')
文章来源:https://blog.csdn.net/qq_51831335/article/details/135403132
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。