Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。
注意:Python的select()方法适用于UNIX操作系统,不支持Windows操作系统
接下来通过echo server例子要以了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的。
import select import socket import sys import Queue # Create a TCP/IP socket server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # Bind the socket to the port server_address = ('localhost', 10000) print >>sys.stderr, 'starting up on %s port %s' % server_address server.bind(server_address) # Listen for incoming connections server.listen(5)
select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select()。
# Sockets from which we expect to read inputs = [ server ] # Sockets to which we expect to write outputs = [ ]
所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发。
# Outgoing message queues (socket:Queue) message_queues = {}
通过服务器主循环将连接添加到这些列表并从这些列表中移除。由于服务器的这个版本在发送任何数据之前将等待套接字变得可写(而不是立即发送应答),所以每个输出连接都需要一个队列作为要通过它发送的数据的缓冲区。
下面是此程序的主循环,调用select()时会阻塞和等待直到新的连接和数据进来。
# Wait for at least one of the sockets to be ready for processing print >>sys.stderr, '\nwaiting for the next event' readable, writable, exceptional = select.select(inputs, outputs, inputs)
当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,
writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。
Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。
# Handle inputs for s in readable: if s is server: # A "readable" server socket is ready to accept a connection connection, client_address = s.accept() print >>sys.stderr, 'new connection from', client_address connection.setblocking(0) inputs.append(connection) # Give the connection a queue for data we want to send message_queues[connection] = Queue.Queue()
第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。
else: data = s.recv(1024) if data: # A readable client socket has data print >>sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) message_queues[s].put(data) # Add output channel for response if s not in outputs: outputs.append(s)
第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。
else: # Interpret empty result as closed connection print >>sys.stderr, 'closing', client_address, 'after reading no data' # Stop listening for input on the connection if s in outputs:
outputs.remove(s) #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉。
inputs.remove(s) #inputs中也删除掉 s.close() #把这个连接关闭掉 # Remove message queue del message_queues[s]
对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态。
# Handle outputs for s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: # No messages waiting so stop checking for writability. print >>sys.stderr, 'output queue for', s.getpeername(), 'is empty' outputs.remove(s) else: print >>sys.stderr, 'sending "%s" to %s' % (next_msg, s.getpeername()) s.send(next_msg)
最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\outputs\message_queue中都删除,再把连接关闭掉。
# Handle "exceptional conditions" for s in exceptional: print >>sys.stderr, 'handling exceptional condition for', s.getpeername() # Stop listening for input on the connection inputs.remove(s) if s in outputs: outputs.remove(s) s.close() # Remove message queue del message_queues[s]
神龙|纯净稳定代理IP免费测试>>>>>>>>天启|企业级代理IP免费测试>>>>>>>>IPIPGO|全球住宅代理IP免费测试