# epool_service.py
import queue
import select
import socket
import selectors
selectors.EVENT_READ
sel = selectors.DefaultSelector()
sk = socket.socket()
sk.setblocking(False) # 设置非阻塞
address = ('localhost', 12345)
sk.bind(address)
print(f'绑定套接字到本机端口 12345')
sk.listen()
message = {}
TIMEOUT = 5
ep = select.epoll() # 初始化一个epoll事件
ep.register(sk.fileno(), select.EPOLLIN) # 注册读事件
fd_to_socket = {sk.fileno(): sk} # 文件句柄映射到对应的套接字实例
def read_event(fd):
s = fd_to_socket[fd]
data = s.recv(1024) # 读事件 读取数据
if data:
print(f'接收到来自{s.getpeername()}的数据: {data}')
message[s].put(data) # 处理接收到的信息
ep.modify(fd, select.EPOLLOUT) # 接收过数据后,将该句柄的修改并加入写事件集合
else:
# ep.modify(fd, select.EPOLLHUP) # 接收到空数据, 挂起
socket_close(fd)
def write_event(fd):
s = fd_to_socket[fd]
try:
next_msg = message[s].get_nowait()
except queue.Empty:
print(f'连接{s.getpeername()}已经没有等待任务,将被关闭')
ep.modify(fd, select.EPOLLIN)
else:
print(f'回写数据{next_msg}到{s.getpeername()}')
s.send(next_msg) # 发送消息如果该套接字上任务全完成 移出该套接字
def socket_close(fd):
print(f'收到关闭事件,关闭客户端 {cur_sk.getpeername()} 连接')
ep.unregister(fd)
fd_to_socket[fd].close()
del fd_to_socket[fd]
while True:
try:
print(f'等待连接。。。。。')
events = ep.poll(TIMEOUT) # epoll中的所有事件集合
if not events:
print(f'{ep}超时无连接,重新轮询...')
continue
for fd, event in events:
cur_sk = fd_to_socket[fd] # 在映射中找到该句柄对应的套接字
if cur_sk is sk: # 如果是主服务, 表明有新的客户端连接过来
connection, sk_info = sk.accept() # 接收连接
print(f'新连接信息: {sk_info}')
connection.setblocking(False) # 变成非阻塞
ep.register(connection.fileno(), select.EPOLLIN) # 注册读事件
# 将该句柄与对应的套接字关联起来 并且写入消息
fd_to_socket[connection.fileno()] = connection
message[connection] = queue.Queue()
elif event & select.EPOLLIN:
# 处理读事件
read_event(fd)
elif event & select.EPOLLOUT:
# 处理写事件
write_event(fd)
elif event & select.EPOLLHUP:
# 连接关闭事件
socket_close(fd)
except KeyboardInterrupt:
ep.close()
print('退出。', ep.closed)
break