前言
最近同学让我帮忙写一个测试网络的工具。由于工作上的事情,断断续续地拖了很久才给出一个相对完整的版本。其实,我Python用的比较少,所以基本都是边查资料边写程序。
程序的主要逻辑如下:
读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。
代码如下所示:
#! /usr/bin/env python
# -*- coding: UTF-8 -*-
# File: pingtest_test.py
# Date: 2008-09-28
# Author: Michael Field
# Modified By:intheworld
# Date: 2017-4-17
import sys
import os
import getopt
import commands
import subprocess
import re
import time
import threading
import xlrd
import xlwt
TEST = [
'220.181.57.217',
'166.111.8.28',
'202.114.0.242',
'202.117.0.20',
'202.112.26.34',
'202.203.128.33',
'202.115.64.33',
'202.201.48.2',
'202.114.0.242',
'202.116.160.33',
'202.202.128.33',
]
RESULT={}
def usage():
print "USEAGE:"
print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %sys.argv[0]
print "\t TEST为简单测试的IP列表"
print "\t-t times 测试次数;默认为1000;"
print "\t-c concurrent number 并行线程数目:默认为10"
print "\t-h|-"
print "\t 输出为当前目录文件ping_result.txt 和 ping_result.xls"
print "for example:"
print "\t./ping_test.py -n TEST -t 1 -c 10"
def div_list(ls,n):
if not isinstance(ls,list) or not isinstance(n,int):
return []
ls_len = len(ls)
print 'ls length = %s' %ls_len
if n<=0 or 0==ls_len:
return []
if n > ls_len:
return []
elif n == ls_len:
return [[i] for i in ls]
else:
j = ls_len/n
k = ls_len%n
### j,j,j,...(前面有n-1个j),j+k
#步长j,次数n-1
ls_return = []
for i in xrange(0,(n-1)*j,j):
ls_return.append(ls[i:i+j])
#算上末尾的j+k
ls_return.append(ls[(n-1)*j:])
return ls_return
def pin(IP):
try:
xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)
except Exception:
xpin = 'empty'
ms = '=[0-9]+ms'.decode("utf8")
print "%s" %ms
print "%s" %xpin
mstime=re.search(ms,xpin)
if not mstime:
MS='timeout'
return MS
else:
MS=mstime.group().split('=')[1]
return MS.strip('ms')
def count(total_count,I):
global RESULT
nowsecond = int(time.time())
nums = 0
oknums = 0
timeout = 0
lostpacket = 0.0
total_ms = 0.0
avgms = 0.0
maxms = -1
while nums < total_count:
nums += 1
MS = pin(I)
print 'pin output = %s' %MS
if MS == 'timeout':
timeout += 1
lostpacket = timeout*100.0 / nums
else:
oknums += 1
total_ms = total_ms + float(MS)
if oknums == 0:
oknums = 1
maxms = float(MS)
avgms = total_ms / oknums
else:
avgms = total_ms / oknums
maxms = max(maxms, float(MS))
RESULT[I] = (I, avgms, maxms, lostpacket)
def thread_func(t, ipList):
if not isinstance(ipList,list):
return
else:
for ip in ipList:
count(t, ip)
def readIpsInFile(excelName):
data = xlrd.open_workbook(excelName)
table = data.sheets()[0]
nrows = table.nrows
print 'nrows %s' %nrows
ips = []
for i in range(nrows):
ips.append(table.cell_value(i, 0))
print table.cell_value(i, 0)
return ips
if __name__ == '__main__':
file = 'ping_result.txt'
times = 10
network = ''
thread_num = 10
args = sys.argv[1:]
try:
(opts, getopts) = getopt.getopt(args, 'n:t:c:h"\nInvalid command line option detected."
usage()
sys.exit(1)
for opt, arg in opts:
if opt in ('-n'):
network = arg
if opt in ('-h', '-"sheet1", cell_overwrite_ok=True)
if not isinstance(times,int):
usage()
sys.exit(0)
if network not in ['TEST'] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)):
print "The network is wrong or excel file does not exist. please check it."
usage()
sys.exit(0)
else:
if network == 'TEST':
ips = TEST
else:
ips = readIpsInFile(network)
print 'Starting...'
threads = []
nest_list = div_list(ips, thread_num)
loops = range(len(nest_list))
print 'Total %s Threads is working...' %len(nest_list)
for ipList in nest_list:
t = threading.Thread(target=thread_func,args=(times,ipList))
threads.append(t)
for i in loops:
threads[i].start()
for i in loops:
threads[i].join()
it = 0
for line in RESULT:
value = RESULT[line]
sheet1.write(it, 0, line)
sheet1.write(it, 1, str('%.2f'%value[1]))
sheet1.write(it, 2, str('%.2f'%value[2]))
sheet1.write(it, 3, str('%.2f'%value[3]))
it+=1
f.write(line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')
f.close()
workbook.save('ping_result.xls')
print 'Work Done. please check result %s and ping_result.xls.'%file
这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。
- excel读写使用了xlrd和xlwt,基本都是使用了一些简单的api。
- 使用了threading实现多线程并发,和POSIX标准接口非常相似。thread_func是线程的处理函数,它的输入包含了一个ip的List,所以在函数内部通过循环处理各个ip。
- 此外,Python的commands在Windows下并不兼容,所以使用了subprocess模块。
到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作,以后有必要再改咯!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。