下面是一个测试solib库中调用函数的测试脚本,但该脚本还存在这一些问题,我目前无法理解和解决;
问题:
1.我定义了logging采用日志滚动的方式,写日志,并且每个日志的大小是20M,但测试结果发现日志连1M都没到就开始轮转了,并且在轮转过程中,还出现logging写日志,却发现,日志轮转了,结果竟然报了,轮转日志不存在。
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
Traceback (most recent call last):
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 77, in emit
self.doRollover()
File "/usr/local/python2710/lib/python2.7/logging/handlers.py", line 135, in doRollover
os.remove(dfn)
OSError: [Errno 2] No such file or directory: '/tmp/mysofuns_test.log.5'
2.在进行多线程测试时,发现写到文件中的日志和直接打到屏幕上的日志竟然不一样。但经过分析,发现写到文件中的日志,出现了重复,而打到屏幕上的日志却是正常的。不能理解,暂时我也不知道如何避免。
#!/usr/bin/python27 #coding:utf-8 import threading from os.path import exists,basename from os import mkdir,getcwd,sep import sys import time import logging logging.basicConfig(level=logging.DEBUG,format='%(asctime)s (%(threadName)-2s) %(message)s',) class fileso(): conf_dir = getcwd() + '/conf' conf_filename = "config.py" def init(self,conf_dir=None,conf_filename=None): if conf_dir: self.conf_dir = conf_dir conf_dir = self.conf_dir sys.path.append(conf_dir) if conf_filename: self.conf_filename = conf_filename conf_filename = conf_dir + sep + self.conf_filename if exists(conf_dir): if exists(conf_filename): return True else: print "没有在",conf_dir,"中找到测试主配置文件:",conf_filename else: mkdir(conf_dir) with open(conf_filename,'w+') as conf_file: print "开始初始化主配置文件...." conf_contant = """ #coding:utf-8 # 设置输出日志的格式,默认只输出消息. # 如: -f fn 或 -f f lineno,dt,tn,pid # 可使用的格式符号: # ln 显示日志级别名称. # fp 显示此程序的绝对路径. # fn 显示当前执行程序名. # funcn 显示打印日志的函数名. # lineno 显示日志的行号. # dt 显示日期时间. # tn 显示进程名. # pid 显示进程pid. logformat = "" # 设置输出日志的文件名: logfilename = "/tmp/mytest.log" # so库函数的位置: so_file = "/tmp/libPSBCSM3JDLL.so" # C语言类型与测试工具需要的类型的对应表 # #-------------------------------------------------- # C type =============== TEST type #-------------------------------------------------- # # char ---------------- strchar (1-character string) # char* ---------------- strchar* # # char ----------------- char (byte char) # unsigned char -------- unsigned char (byte char) # unsigned char* ------- unsigned char* (byte char) # wchar_t -------------- c_wchar (1-character unicode string) # wchar_t* ------------- c_wchar_p (unicode or None) # # short ---------------- short # unsigned short ------- unsigned short # int ------------------ int # int8 ----------------- int8 (8bit int) # int16 ---------------- int16 (16bit int) # int32 ---------------- int32 (32bit int) # int64 or __int64 ----- int64 (64bit int) # unsigned int8 -------- unsigned int8 (8bit unsigned int) # unsigned int16 ------- unsigned int16 (16bit unsigned int) # unsigned int32 ------- unsigned int32 (32bit unsigned int) # unsigned int64 ------- unsigned int64 (64bit unsigned int) # long ----------------- long # unsigned long -------- unsigned long # long long ------------ long long # # float ---------------- float # double --------------- double # # bool ----------------- bool # void* ---------------- void* # #-------------------------------------------------- # # so库函数所有的参数类型 so_args = ( #['FunctionsName','Func_return_type','Func_type1','Func_type2',...] ["TestFuncsName1", "int", "unsigned char*", "unsigned char*", "unsigned char*", "unsigned char*", "int", "int", "int", "__int64", "int"], # <--- Note:"," ["TestFuncsName2", "int", "unsigned char*", "unsigned char*", "unsigned char*", "unsigned char*", "int", "int", "int", "__int64", "int"], # <--- Note:"," ) # 给so库函数提供的所有测试数据. # 下面是为了更清晰而分行写了,它们是可以写成一行的. # 写成一行的格式: # so_value = [ (["函数名",'第一个参数','第二个参数','第三个参数',....]), ] # 请注意书写格式: 特别是逗号; 如------------------------------------- ^ ,这个。 so_value = [ #(["FunctionName",'Func_parameter1','Func_parameter2','Func_parameter3',....]), ("TestFuncsName1", ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, #<---- 注:若整数类型的,可直接写. 2, 6, 20160101000000, 0 ], # <--- 注意:"," ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], # <--- Note:"," ), # <--- Note:"," ("TestFuncsName2", ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], # <--- Note:"," ['0x1234567812345678123456781234567812345678123456781234567812345678', '12345678901', '123456', '123456', 0, 2, 6, 20160101000000, 0 ], ), # <--- Note:"," ] """ conf_file.write(conf_contant) conf_file.flush() conf_file.close() print "初始化完成,现在可以修改主配置文件:",conf_filename, \ '了\n修改完成后,直接执行',sys.argv[0],'即可.\n查看帮助可输入:',sys.argv[0],' -h' \ '若需要重新初始化,则直接删除:',conf_filename sys.exit(0) osFuncs = {} conf_fobj = None logformat = None logfilename = None multiThread = False # 注: 我对C语言不是太懂,以前学了皮毛,都还老师了, #故:这里的C类型对应,是按PythonDOC中的说明写的。其中char存在歧义,故做了一点修改, #还有int64不是太确定以下定义是否正确。 dict_ctype = {"strchar":'c_char',"strchar*":'c_char_p', "wchar_t":'c_wchar',"wchar_t*":'c_wchar_p', "char":'c_byte', "unsigned char":'c_ubyte',"unsigned char*":'c_char_p', "short":'c_short', "unsigned short":'c_ushort', "int":'c_int',"int8":'c_uint8',"int16":'c_uint16', "int32":'c_uint32',"int64":'c_uint64', "unsigned int8":'c_uint8',"unsigned int16":'c_uint16', "unsigned int32":'c_uint32',"unsigned int64":'c_uint64', "long":'c_long', "unsigned long":'c_ulong', "__int64":'c_longlong',"long long":'c_longlong', "unsigned __int64":'c_ulonglong',"unsigned long long":'c_ulonglong', "float":'c_float', "double":'c_double', "bool":'c_bool', "void*":'c_void_p', } def __init__(self,conf_dir=None,conf_filename=None): self.init(conf_dir,conf_filename) self.conf_fobj = __import__(self.conf_filename.rstrip('.py')) self.logfilename = self.conf_fobj.logfilename self.logformat = self.conf_fobj.logformat.split(',') def setMultiThread(self,mBool): self.multiThread = mBool def convertType_c2t(self): """The function is c type convert test type. """ fargs = self.conf_fobj.so_args funcs = {} paras = "" retargs = {} for l in fargs: # Second paras: return type of the function retargs[l[0]]=(self.dict_ctype[l[1]]) for i in range(len(l[2:])): paras = paras + "," + self.dict_ctype[l[i+2]] # First paras: function name of os file in funcs[l[0]] = paras.lstrip(',') paras = "" return funcs,retargs def handleValue_t2c(self,values,paras): """The function is test value convert c value.""" count = 0 vStr = "" intlong = ("c_byte","c_ubyte","c_short","c_ushort","c_int","c_long", \ "c_ulong","c_longlong","c_ulonglong","c_uint8","c_uint16", \ "c_uint32","c_uint64",) for p in paras.split(','): if p == "c_bool": # c type:_Bool = py type:bool(1) vStr = vStr + ',' + str(values[count]) elif p == "c_char": # c type:char = py type: 1-character string vStr = vStr + ',"' + values[count] + '"' elif p == "c_wchar": # c type:wchar_t = py type: 1-character unicode string vStr = vStr + ',"' + values[count] + '"' elif p in intlong: # c type:intlong = py type: int/long vStr = vStr + ',' + str(values[count]) elif p in ('c_float','c_double','c_longdouble'): # c type:() = py type: float vStr = vStr + ',' + str(values[count]) elif p == 'c_char_p': # c type:char* = py type:string or None vStr = vStr + ',"' + values[count] + '"' elif p == 'c_wchar_p': # c type:wchar_t* = py type:unicode or None vStr = vStr + ',"' + values[count] + '"' elif p == 'c_void_p': # c type:void* = py type:int/long or None vStr = vStr + ',"' + str(values[count]) + '"' count += 1 return vStr.lstrip(',') # 注:这里不是注释,预览看这里变成注释了; def content_join(self,mainContent): imp_section_content = """ #coding:utf-8 from ctypes import * import logging from logging.handlers import RotatingFileHandler """ format = "" for f in self.logformat: if f == 'ln': format = format + ' %(levelname)s' elif f == 'fp': format = format + ' %(pathname)s' elif f == 'fn': format = format + ' %(filename)s' elif f == 'funcn': format = format + ' %(funcName)s' elif f == 'lineno': # output: log row number. format = format + ' %(lineno)d' elif f == 'dt': # output: data time. format = format + ' %(asctime)s' elif f == 'tn': format = format + ' %(threadName)s' elif f == 'pid': format = format + ' %(process)d' format = format + ' %(message)s' log_section_content = 'format="' + format + '"\n' + 'logfilename="' + self.logfilename + '"' + """ log = logging.getLogger() Rthandler = RotatingFileHandler(logfilename,maxBytes=20*1024*1024,backupCount=5) Rthandler.setFormatter(logging.Formatter(format)) log.addHandler(Rthandler) log.setLevel(logging.INFO) """ main_section_content = "fso='" + self.conf_fobj.so_file + "'" + ''' cdll.LoadLibrary(fso) openFso = CDLL(fso) ''' + mainContent content = imp_section_content + '\n' + log_section_content + '\n' + main_section_content return content def constructor(self): '''功能: 合成调用so库函数的语句; 语句有三种: 1.指定so库函数的参数都是什么类型; 2.指定so库函数的返回值是什么类型; 3.指定so库函数需要的所有参数值. ''' # convertType_c2t return 2 value: # funcs={"funcName":"funcParaTypes"} # retargs={funcName:returnParaType} funcs,retargs = self.convertType_c2t() mainContentAll = "" if self.multiThread: compile_main = {} compile_common = compile(self.content_join(""),'','exec') self.osFuncs[compile_common] = compile_main # paraValues=('funcName',['paras1','paras2',...],[...],...) # so_value=[paraValues,paraValues,...] for paraValues in self.conf_fobj.so_value: for k_funcName,v_funcStrParaTypes in funcs.items(): if paraValues[0] == k_funcName: count = 0 for groupval in paraValues[1:]: funcStrParaValues = self.handleValue_t2c(groupval,v_funcStrParaTypes) #指定so库函数的参数都是什么类型; funcName_type_join = 'openFso.' + k_funcName + '.argtpes=[' + v_funcStrParaTypes + ']' #指定so库函数的返回值是什么类型; funcName_retType_join = 'openFso.' + k_funcName + '.restype=' + retargs[k_funcName] #指定so库函数需要的所有参数值. call_func_join = 'openFso.' + k_funcName + '(' + funcStrParaValues + ')' output_msg_c1 = "log.info('测试:" + str(count) + "')" output_msg_c2 = "log.info('测试内容:" + call_func_join + "')" exec_func_join = "exec_func = " + call_func_join output_msg_c3 = "log.info('测试结果:'+str(exec_func))" mainContent = funcName_type_join + '\n' + \ funcName_retType_join + '\n' + \ output_msg_c1 + '\n' + \ exec_func_join + '\n' + \ output_msg_c2 + '\n' + \ output_msg_c3 + '\n' # 生成一个文件中包含so库的所有函数 mainContentAll = mainContentAll + '\n' + mainContent if self.multiThread: # 每个元素是一个文件,一个文件中只测试一个so库函数 if count == 0: compile_custom_test = [] #print 'k_funcName=',k_funcName,'\nmainContent=',mainContent compile_custom_test.append(compile(mainContent,'','exec')) compile_main[k_funcName]=compile_custom_test count += 1 return self.content_join(mainContentAll) def starttest(self): ''' 单线程测试. ''' exec self.constructor() print "提示:请查看输出日志文件:",self.logfilename def mstarttest(self): ''' 根据测试函数的个数启动相应数量的线程. ''' self.setMultiThread(True) self.constructor() self.run() def num_starttest(self,num): """启动指定倍数的线程""" self.setMultiThread(True) self.constructor() for i in range(num): self.run('--'+str(i)+"轮") def run(self,mark=""): if self.osFuncs: # self.osFuncs=[compile_common_code1,{"funcName":[compile_custom_test_para1,...]},{..},..] k_common_code, = self.osFuncs all_paras_code = self.osFuncs[k_common_code] # condition: 它必须是全局条件锁,否则它将失去它的作用. condition = threading.Condition() worker = [] i = 0 for k_funcName,v_execcode in all_paras_code.items(): funcCode = (k_common_code,v_execcode,) w = threading.Thread(name='Worker'+str(i)+mark,target=self.worker,args=(condition,funcCode,)) worker.append(w) i += 1 b = threading.Thread(name='Boss',target=self.boss,args=(condition,)) for t in worker: t.start() b.start() def boss(self,condition): logging.debug('多线程测试开始...') # 这里用with语句写的,但其实,我对它并不是很了解,只是照书上写的。如果有了解的,也可以多交流,呵呵。。 with condition: #这里调用notifyAll同时通知所有已经处于wait状态的子线程,使它们同时启动起来。实现一个简单的并行压测。 condition.notifyAll() def worker(self,condition,codefile): #with condition就类似于if condition.acquire():,来判断是否获取了锁; #with语句的优势: 似乎是可以通过__exit__()来释放锁;这个我不确定,可能理解错误。 #这篇文章,介绍with语句的用法:http://blog.csdn.net/suwei19870312/article/details/23258495 with condition: #线程执行wait()方法时,会处于等待状态,它只有接受到notify通知后,才能获得锁。 condition.wait() exec codefile[0] for codeObj in codefile[1]: #print 'codeObj=',codeObj exec codeObj help_msg = '使用格式:' + sys.argv[0] + ' [-h | -f | -mf | -mn]\n' + \ '参数:\n' + \ '\t-h 显示帮助信息.\n\t-mf 启动线程数=测试函数的个数\n' + \ '\t-mn 启动线程数=测试函数的个数 x 指定的数.如: -mn 3 就是启动3倍的函数个数.\n' def handle_paras(): fso = fileso() count = 1 for args in sys.argv[1:]: if args in ('/h','-h','--help'): print help_msg sys.exit(0) elif args == '-mf': fso.mstarttest() return elif args == '-mn': Num = int(sys.argv[count+1]) fso.num_starttest(Num) return fso.starttest() if __name__ == '__main__': handle_paras()
原创文章,作者:Wn1m,如若转载,请注明出处:http://www.178linux.com/10758