下面给大家介绍python实现简易版的web服务器,具体内容详情大家通过本文学习吧!
1、请自行了解HTTP协议
https://www.jb51.net/article/133883.htm(点击跳转)
2、创建Socket服务,监听指定IP和端口
3、以阻塞方式等待客户端连接
4、读取客户端请求数据并进行解析
5、准备服务器运行上下文
6、处理客户端请求数据
7、根据用户请求路径读取文件
8、返回响应结果给客户端
9、程序入口
10、目录结构
11、运行
python wsgiserver.py app:run
12、源码
a.wsgiserver.py文件
#encoding:utf-8
import socket
import StringIO
import sys
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class WSGIServer(object):
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 30
recv_size = 1024
def __init__(self, server_address):
self._listen_socket = _listen_socket = socket.socket(self.address_family,
self.socket_type) _listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1) _listen_socket.bind(server_address)
_listen_socket.listen(self.request_queue_size)
_host, _port = _listen_socket.getsockname()
self._server_name = socket.getfqdn(_host)
self._server_port = _port
self._headers_set = []
self._application = None
self._client = None
self._request_data = None
self._request_method = None
self._path = None
self._request_version = None
self._start_response = None
def set_application(self, application):
self._application = application
def server_forever(self):
_listen_socket = self._listen_socket
logger.info('listen on %s:%s', self._server_name, self._server_port) while 1:
try:
self._client, _addr = _listen_socket.accept()
self._handle_request(_addr)
except KeyboardInterrupt as e:
logger.info('interrupt')
break
except BaseException as e:
logger.error(e)
def _handle_request(self, client_addr):
self._request_data = _request_data = self._client.recv(self.recv_size)
self._parse_request_data(_request_data)
_env = self._get_environment(client_addr)
_result = self._application(_env, self.start_response)
self._finish_response(_result)
def _parse_request_data(self, request_data):
_request_line = str(request_data.splitlines()[0]).rstrip('\r\n')
(self._request_method, self._path, self._request_version) = _request_line.split()
def _get_environment(self, client_addr):
_env = {}
_env['wsgi.version'] = (1, 0)
_env['wsgi.url_scheme'] = 'http'
_env['wsgi.input'] = StringIO.StringIO(self._request_data) _env['wsgi.errors'] = sys.stderr
_env['wsgi.multithread'] = False
_env['wsgi.multiprocess'] = False
_env['wsgi.run_once'] = False
_env['REQUEST_METHOD'] = self._request_method.upper()
_env['PATH_INFO'] = self._path
_env['SERVER_NAME'] = self._server_name
_env['SERVER_PORT'] = self._server_port
_env['HTTP_CLIENT_IP'] = client_addr[0]
logger.info('%s %s %s %s', _env['HTTP_CLIENT_IP'], datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _env['REQUEST_METHOD'], _env['PATH_INFO'])
return _env
def start_response(self, status, response_headers, exc_info=None): _server_headers = [
('Date', 'Sun, 7 Jun 2015 23:07:04 GMT'),
('Server', 'WSGIServer 0.1')
]
self._headers_set = [status, response_headers + _server_headers]
def _finish_response(self, result):
_status, _response_headers = self._headers_set
_response = 'HTTP/1.1 {status}\r\n'.format(status=_status) for _header in _response_headers:
_response += '{0}:{1}\r\n'.format(*_header)
_response += '\r\n'
for _data in result:
_response += _data
self._client.sendall(_response)
self._client.close()
def make_server(server_address, application):
server = WSGIServer(server_address)
server.set_application(application)
return server
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
server_addr= ('0.0.0.0', 43002)
app_path = sys.argv[1]
module, application = app_path.split(':')
module = __import__(module)
application = getattr(module, application)
httpd = make_server(server_addr, application)
httpd.server_forever()
b.app.py文件
#encoding:utf-8
import os
class PageNotFoundException(BaseException):
pass
def render(filename, dirname='html'):
_path = os.path.join(dirname, filename)
if os.path.exists(_path):
with open(_path, 'rb') as handler:
return handler.read()
raise PageNotFoundException('file not found:%s' % _path)
def run(env, start_response):
_path = env.get('PATH_INFO')
response = ''
try:
_path = 'index.html' if _path == '/' else _path[1:]
if _path.endswith('.css'):
start_response('200 OK', [('Content-Type', 'text/css')])
elif _path.endswith('.js'):
start_response('200 OK', [('Content-Type', 'text/javascript')]
elif _path.endswith('.html'):
start_response('200 OK', [('Content-Type', 'text/html')])
else:
start_response('200 OK', [('Content-Type', 'text/plain'), ('Content-Disposition', 'attachment; filename=%s' % os.path.basename(_path))])
response = render(_path)
except PageNotFoundException as e:
response = render('404.html')
return [response, '\r\n']
总结
以上所述是小编给大家介绍的Python实现简易版的Web服务器,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。









