1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# epool_service.py

import queue
import select
import socket
import selectors

selectors.EVENT_READ
sel = selectors.DefaultSelector()


sk = socket.socket()
sk.setblocking(False)  # 设置非阻塞
address = ('localhost', 12345)
sk.bind(address)
print(f'绑定套接字到本机端口 12345')
sk.listen()

message = {}
TIMEOUT = 5

ep = select.epoll()  # 初始化一个epoll事件
ep.register(sk.fileno(), select.EPOLLIN)  # 注册读事件
fd_to_socket = {sk.fileno(): sk}  # 文件句柄映射到对应的套接字实例

def read_event(fd):
    s = fd_to_socket[fd]
    data = s.recv(1024)  # 读事件 读取数据
    if data:
        print(f'接收到来自{s.getpeername()}的数据: {data}')
        message[s].put(data)  # 处理接收到的信息
        ep.modify(fd, select.EPOLLOUT)  # 接收过数据后,将该句柄的修改并加入写事件集合
    else:
        # ep.modify(fd, select.EPOLLHUP)  # 接收到空数据, 挂起
        socket_close(fd)

def write_event(fd):
    s = fd_to_socket[fd]
    try:
        next_msg = message[s].get_nowait()
    except queue.Empty:
        print(f'连接{s.getpeername()}已经没有等待任务,将被关闭')
        ep.modify(fd, select.EPOLLIN)
    else:
        print(f'回写数据{next_msg}{s.getpeername()}')
        s.send(next_msg)  # 发送消息如果该套接字上任务全完成 移出该套接字

def socket_close(fd):
    print(f'收到关闭事件,关闭客户端 {cur_sk.getpeername()} 连接')
    ep.unregister(fd)
    fd_to_socket[fd].close()
    del fd_to_socket[fd]


while True:
    try:
        print(f'等待连接。。。。。')
        events = ep.poll(TIMEOUT)  # epoll中的所有事件集合

        if not events:
            print(f'{ep}超时无连接,重新轮询...')
            continue

        for fd, event in events:
            cur_sk = fd_to_socket[fd]  # 在映射中找到该句柄对应的套接字
            if cur_sk is sk:  # 如果是主服务, 表明有新的客户端连接过来
                connection, sk_info = sk.accept() # 接收连接
                print(f'新连接信息: {sk_info}')
                connection.setblocking(False)  # 变成非阻塞
                ep.register(connection.fileno(), select.EPOLLIN) # 注册读事件
                # 将该句柄与对应的套接字关联起来 并且写入消息
                fd_to_socket[connection.fileno()] = connection
                message[connection] = queue.Queue()

            elif event & select.EPOLLIN:
                # 处理读事件
                read_event(fd)

            elif event & select.EPOLLOUT:
                # 处理写事件
                write_event(fd)

            elif event & select.EPOLLHUP:
                # 连接关闭事件
                socket_close(fd)

    except KeyboardInterrupt:
        ep.close()
        print('退出。', ep.closed)
        break
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# select_service.py

import select
import socket
import queue

sk = socket.socket()
sk.setblocking(False)  # 设置非阻塞
address = ('localhost', 12345)
sk.bind(address)
print(f'绑定套接字到本机端口 12345')
sk.listen()

inputs = [sk]  # 用于检测读入的套接字
outputs = []  # 用于检测写出事件的套接字
message = {}  # 存储信息


def socket_close(s: socket.socket):
    if s in outputs:
        outputs.remove(s)
    inputs.remove(s)
    s.close()
    del message[s]

def read_event(s: socket.socket):
    # 如果是新的套接字是主连接的套接字, 说明有新客户端连接进来了
    if s is sk:
        connection, client_info = s.accept()  # 该套接字监控其他连接
        print(f'新连接信息:{client_info}')
        connection.setblocking(False)  # 改成非阻塞
        inputs.append(connection)  # 添加新连接到监控
        message[connection] = queue.Queue()  # 给它准备好要发送的数据队列
    else:
        # 与已发送数据的客户端建立连接 获取已发送数据
        data = s.recv(1024)
        if data:
            print(f'接收到来自{s.getpeername()}的数据: {data}')
            message[s].put(data)  # 处理接收到的信息
            if s not in outputs:
                outputs.append(s)
        else:
            # 没有数据关闭连接
            socket_close(s)

def write_event(s: socket.socket):
    try:
        # 非阻塞获取该套接字上的任务
        next_msg = message[s].get_nowait()
    except queue.Empty:
        print(f'连接{s.getpeername()}已经没有等待任务,将被关闭')
        outputs.remove(s)  # 如果该套接字上任务全完成 移出该套接字
    else:
        print(f'回写数据{next_msg}{s.getpeername()}')
        s.send(next_msg)  # 发送消息

if __name__ == '__main__':
    while inputs:
        print(f'等待新的事件连接进来......')
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        for s in readable:
            read_event(s)
        for s in writable:
            write_event(s)
        for s in exceptional:
            socket_close(s)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# client.py

import socket

messages = [
    'hello world',
    'who are you',
    'i am cjr'
]

server_address = ('localhost', 12345)

socks = [
    socket.socket(),
    socket.socket()
]

# 把所有的套接字都连接到服务器端口
for s in socks:
    s.connect(server_address)

for msg in messages:
    data = msg.encode()
    for s in socks:
        print(f'send {data} to {s.getpeername()}')
        s.send(data)

    for s in socks:
        data = s.recv(1024)
        print(f'received {data} from {s.getpeername()}')
        if not data:
            s.close()

评论 0

😆😌🤪