暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用python 的【Socket】方式实践五个IO模型的示例

小儿来一壶枸杞酒泡茶 2021-04-28
553

上次梳理关于【IO】模型的时候,仅停留于文字上,这次基于Socket的的方式使用网络IO的模式还原一下相关的IO模型。


1 Socket 基础简介

PS:这里先不展开很详细的关于OSI模型的,后续关于网络编程专题的时候再去整理。

来自百度百科的解释(纯手打加深印象):

套接字(Socket):

  • 是对网络中不同主机上的应用进程之间进行**双向通信*的端点的抽象描述。

  • 一个套接字就是网络上进程通信的一端,提供了应用层进程利用网络协议交换数据的机制。

  • 从所处的地位来讲,套接字上联应用进程,下联网络协议栈

    • 是应用程序通过网络协议进行通信的接口
    • 是应用程序与网络协议根进行交互的接口
  • 某种意义上Socket(套接字)也可以理解为是操作系统内核中的一个数据结构.它几乎是所有网络通信的基础。

  • socket也有一个类似于打开文件的函数调用,该函数返回一个整型的socket描述符,随后的连接建立、数据传输等操作都是通过socket描述符来实现的。

表示方法:

Socket是由IP地址和端口结合的,提供向应用层进程传送数据包的机制 。一个SOCKER= (ip:端口号) 首先理解网络通信:通常是指不同计算机上的进程间通信。

网络通信可以理解是Socket之间的互联,所以Socket之间的互联必不可缺的要素有:

  • IP地址:网络节点就是指计算机或路由都上的一个网络地址,也就是IP地址。

PS:IP地址只能确定进程所在的计算机

  • 端口号:在计算机上一个端口号一次只能分配给一个进程,端口号和进程之间是一 一对应的关系,端口号的范围从0~65535。

端口号+网络地址的组合= 可以唯一的确定整个网络中的一个网络进程。

PS:一个完整的套接字则用一个相关描述{协议、本地地址、本地端口、远程地址、远程端口}来表示。

一个完整的套接字:客户端侧(本地地址:本地端口)+服务端测(远程地址:远程端口)

Socket 之间的连接过程可以分为三个步骤:

  • (1)服务器监听;
  • (2)客户端连接;
  • (3)连接确认。

关于端口号耗尽的理解

之前时常听说端口号耗尽,其实理解上有点误区,端口号耗尽和我们的Time wait中的链接数超限应该是不同的概念,端口号耗尽应该是偏向于我们的客户端侧的端口。我们每个客户端和服务端指定的IP:端口链接的时候,客户端会默认的随机使用本地的一个端口来进行和服务端进行连接,所以通常说的本地端口耗尽,应该是客户端侧,通常我们的压测,一般也是使用客户端进行压测,当然要看压测的方式,开启的链接数的多少。

类型:

  • 1.流套接字(SOCK_STREAM)

    流套接字用于提供面向连接、可靠的数据传输服务。该服务将保证数据能够实现无差错、无重复送,并按顺序接收。流套接字之所以能够实现可靠的数据服务,原因在于其使用了传输控制协议,即TCP(The Transmission Control Protocol)协议

  • 2.数据报套接字(SOCK_DGRAM)

    数据报套接字提供一种无连接的服务。该服务并不能保证数据传输的可靠性,数据有可能在传输过程中丢失或出现数据重复,且无法保证顺序地接收到数据。数据报套接字使用UDP( User DatagramProtocol)协议进行数据的传输。由于数据报套接字不能保证数据传输的可靠性,对于有可能出现的数据丢失情况,需要在程序中做相应的处理。

  • 3.原始套接字(SOCK_RAW)

    原始套接字与标准套接字(标准套接字指的是前面介绍的流套接字和数据报套接字)的区别在于:原始套接字可以读写内核没有处理的IP数据包,而流套接字只能读取TCP协议的数据,数据报套接字只能读取UDP协议的数据。因此,如果要访问其他协议发送的数据必须使用原始套接。

Socket几个特点:

根据套接字的不同类型,可以将套接字调用分为面向连接服务(TCP)和无连接服务  (UDP)。TCP主要特点如下:

  • (1)数据传输过程必须经过建立连接、维护连接和释放连接3个阶段(三次握手和四次挥手);
  • (2)在传输过程中,各分组不需要携带目的主机的地址 ;
  • (3)可靠性好,但由于协议复杂,通信效率不高。

UDP主要特点如下:

  • (1)不需要连接的各个阶段;
  • (2)每个分组都携带完整的目的主机地址,在系统中独立传送(数据报的方法);
  • (3)由于没有顺序控制,所以接收方的分组可能出现乱序、重复和丢失现象;
  • (4)通信效率高,但可靠性不能确保。

Socket工作流程:

互联网进行通信=一对套接字=客户端侧socke(本地地址:本地端口)+服务端侧socke(远程地址:远程端口)

Socket通信实现步骤解析:

Step 1:创建Server Socket和Client Socket

Step 2:打开连接到的Socket的输入/输出流

Step 3:按照协议对Socket进行读/写操作

Step 4:关闭输入输出流,以及Socket

Socket缓冲区

当一个socket创建之后,操作系统会在内核分配两个缓冲区:

  • 输入缓冲区
  • 输出缓冲区

socket发送的数据:

  • 1:先写入缓冲区内(不会直接向网络中传输)
  • 2:再由TCP 协议将数据从输出缓冲区发送到目标主机。

2 五种IO模式的示例

  • IO模型介绍

IO发生时涉及的对象和步骤。

如网络IO情况下:

对于一个网络IO(network IO),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

  • #1)等待数据准备 (Waiting for the data to be ready)

  • #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

所以IO模型的区别就是在两个阶段上各有不同的情况。

2.1 python下阻塞式IO


阻塞式IO中两个阶段情况:

wait data 等数据的这个阶段是阻塞
copy data 拷贝阶段还是要阻塞

服务端代码

import socket
# 明确配置变量
ip_port = ('127.0.0.1',8080)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# server.setblocking(True)#默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的

# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)

# 最多可以连接多少个客户端
server.listen(listen_num)
while True:
    # #IO操作 在这accept的时候不能干recv的活-在这个位置进行等待,监听端口号
    conn, addr = server.accept()
    while True:
        try:
            print("接收来自客户端:",addr,'的数据')
            # IO操作-接受套接字的大小
            data = conn.recv(buffer_size)
            if len(data) == 0:break
             print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            # 链接错误
            break
    conn.close()
# 关闭服务器
server.close()




客户端代码

from socket import *
c = socket()
c.connect(('127.0.0.1', 8000))
print('获取套接字自己的地址:',c.getsockname())
# ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。 ---服务端没有开启!
while True:
    msg = input('>>>: ').strip()
    if not msg:continue
    # 远程的链接的关闭了话,则会提示异常
    c.send(msg.encode())
    data = c.recv(1024)
    print(data.decode())



结果:

客户端发送数据:

服务端接收数据:

再开启一个新的客户端:

此时新的客户端的是无法发出数据的,因为是阻塞的的,这个时候我们的关闭一下发送体育课的客户端连接:语文课的信息客户端才可以发出。

为测试方便,客户端的测试,我使用一个工具来进行测试:

分析:

上面的执行过程看,我们的服务端仅仅只能处理一个客户端的链接,其他的链接需要等待第一个链接关闭释放了才可以进行处理。上面的模型是阻塞IO模型,我们可以改进使用多进程或多线程的模式分配给不同的客户端单一的链接,但是这种就是也会遇到瓶颈!客户端多的话,多线程就会不断的创建了!!!

(多线程模式的阻塞IO)服务端代码

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     1示例
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/27
-------------------------------------------------
   修改描述-2021/4/27:         
-------------------------------------------------
"
""
import socket
from threading import Thread

# 明确配置变量
ip_port = ('127.0.0.1',8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# server.setblocking(True)#默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的
# server.setblocking()  #默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的
# # 所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问内核数据准备好了没有。
# wait data # 等数据的这个阶段是不阻塞的
# copy data # 这个阶段还是要阻塞的


# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)

# 最多可以连接多少个客户端
server.listen(listen_num)

def handle_communicate(conn):
    while True:
        try:
            # IO操作-接受套接字的大小
            print("接收来自客户端:",addr,'的数据')
            # 在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
            data = conn.recv(buffer_size)

            if len(data) == 0:break

            try:
                # print(line.decode('utf8'))
                # line.decode('utf8')
                # 如果发送的编码格式是  c.send(msg.encode())的话使用ut8进行解码
                # 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
                print(data.decode('utf-8'))
            except:
                # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
                print(data.decode('gbk'))
            conn.send(data.upper())
        except ConnectionResetError as e:
            # 链接错误
            break
    conn.close()

while True:
    # #IO操作 在这accept的时候不能干recv的活-在这个位置进行等待,监听端口号
    conn, addr = server.accept()
    t = Thread(target=handle_communicate, args=(conn,))
    t.start()
# 关闭服务器
server.close()




多线程模式下的可以处理更多的客户端的链接:

但是这种模式的会不断创建线程!虽然也可以“ThreadPoolExecutor池化”。但是池化也还是有限制!

(多进程模式的阻塞IO)服务端代码:

from socket import *
from multiprocessing import Process
import random

def dealClient(conn, addr):
    while True:
        data = conn.recv(1024)
        if len(data):
            try:
                print('收到的数据', data.decode('gbk'), "来自", addr)
            except:
                # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
                print('收到的数据', data.decode('utf-8'), "来自", addr)
            backdata= ["你大爷的!",'你没的!',"什么东东!"]
            # print(type(data))
            # print(str.encode(b))  # 默认 encoding="utf-8"
            conn.send(random.choice(backdata).encode('gbk'))
        else:
            # 如果数据长度为0 说明客户端断开连接,此时跳出循环关闭套接字
            print('客户端%s下线了......' % (str(addr)))
            break
    conn.close()


def main():
    tcpSocket = socket(AF_INET, SOCK_STREAM)

    # 设置套接字可以地址重用
    tcpSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    serverAddr = ('127.0.0.1', 8000)
    tcpSocket.bind(serverAddr)
    tcpSocket.listen(100)
    # 设置监听队列长度,在linux中这个值没有太大意义,kernel有自己的值

    # 为了防止服务端异常退出,因此使用try...finally,确保创建的套接字可以正常关闭
    try:
        while True:
            # 在主进程中不断接收新的连接请求
            conn, addr = tcpSocket.accept()
            # 创建子进程处理已经建立好的连接,tcpSocket依旧去监听是否有新的请求。
            p1 = Process(target=dealClient, args=(conn, addr))
            p1.start()
            conn.close()
            # 创建子进程后,父进程也同样拥有newSocekt,但是子进程使用accept()返回的新套接字发送数据,因此父进程要把newSocket关闭
    except:
        pass
    finally:
        tcpSocket.close()


if __name__ == '__main__':
    main()

示例结果:

2.2 python下非阻塞式IO(一般不会使用这种模式)


非阻塞式IO中两个阶段情况:

wait data 等数据的这个阶段是---不----阻塞
copy data 拷贝阶段还是要阻塞

服务端代码:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     1示例
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/27
-------------------------------------------------
   修改描述-2021/4/27:         
-------------------------------------------------
"
""

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     1示例
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/27
-------------------------------------------------
   修改描述-2021/4/27:         
-------------------------------------------------
"
""

from socket import *
import os
print(os.getpid())

# s = socket()
# s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# s.setblocking(False)
# s.bind(('127.0.0.1', 8000))
# s.listen()



import socket
# 明确配置变量
ip_port = ('127.0.0.1',8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# server.setblocking(True)#默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的
server.setblocking(False) #默认是True  (如果是False,套接字里的一些阻塞操作都变成非阻塞的)
# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)
# 最多可以连接多少个客户端
server.listen(listen_num)


# 客户端链接对象的列表
cilent_conn_list = []
# 待删除的客户端链接列表
cilent_del_list = []
# 这种程序虽说解决了单线程并发,但是大大的占用了cpu
while True:
    try:
        # #收不到数据的时候才出异常
        conn, addr = server.accept()
        cilent_conn_list.append(conn)
        print("接收来自客户端:", addr, '的数据')
    except BlockingIOError:
        #  BlockingIOError:#在收不到数据的那段时间利用起来(利用他收不到数据的时候,才干下面的for循环)
        # print('做其他事')
        for conn in cilent_conn_list:
            # 这句很关键!socket设置阻塞后,并不会影响到recv方法,需要对每个conn再次设置超时!!!
            conn.settimeout(0)
            try:
                # 轮询的方式使CPU一直在空转,很浪费资源。
                # 循环调用recv()将大幅度推高CPU占用率;
                # -该模型会长时间占用着CPU并且不干活 让CPU不停的空转
                data = conn.recv(1024)
                if not data:
                    print("链接关闭1")
                    conn.close()  # 关闭conn
                    # 将无用的conn从r_list删除
                    cilent_del_list.append(conn)
                    continue
                try:
                    # print(line.decode('utf8'))
                    # line.decode('utf8')
                    # 如果发送的编码格式是  c.send(msg.encode())的话使用ut8进行解码
                    # 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
                    print(data.decode('utf-8'))
                except:
                    # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
                    print(data.decode('gbk'))
                conn.send(data.upper())
            except BlockingIOError:
                continue
            except ConnectionResetError:
                # ConnectionResetError:#客户端主动的断开链接的时候(如果突然断开链接,会报错就先添加到列表里面去,完了吧链接给清除了)
                # 处理客户端主动的断开的
                conn.close()
                print("有客户端主动断开了链接!")
                cilent_del_list.append(conn)

        # 把已经关闭的链接从链接队列里面删除
        for conn in cilent_del_list:
            print("把已经关闭的链接从链接队列里面删除")
            cilent_conn_list.remove(conn)

        cilent_del_list.clear()


# 关闭服务器
server.close()



优点:服务端可以接受更多的连接。缺点:

1:关于CPU使用上:

  • data = conn.recv(1024)
  • 调用recv()函数,会将输入缓冲区中的内容拷贝到用户缓冲区。
  • 轮询的方式使CPU一直在空转,很浪费资源。
  • 循环调用recv()将大幅度推高CPU占用率;
  • 该模型会长时间占用着CPU并且不干活 让CPU不停的空转

2:关于任务处理上:

任务完成的响应延迟增大了,需每过一段时间(循环中)才去轮询一次read操作

2.3 python下多路复用IO一些说明  (IO multiplexing)

多路复用IO一些说明:

在多路复用IO的流程:

  • 1:当用户进程执行调用了select,此时整个进程会阻塞(block),

用户进程是被select这个函数block,而不是被socket IO给block

  • 2:内核(kernel)开始监听所有select负责的socket
  • 3:当任何一个socket中的数据已经准备好后,select就会立即返回。
  • 4:selec返回后用户进程还需要再调用一次read操作,从内核(kernel)中拷贝数据到用户缓冲区里面。

多路复用IO 和阻塞IO的区别:

  • 多路复用IO:需要两个的系统调用(select和recvfrom)
  • 阻塞IO:只需要一个系统调用(recvfrom)
  • 基于select的之上多路复用IO,可以同时处理多个连接

PS:

如果处理的链接数不是很多的话,select/epoll的优势不一定比多线程的阻塞IO的方式更好,可能还会有延迟。

select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。它不适用于单链接之中。

服务端代码(示例1,无队列):

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     server
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/28
-------------------------------------------------
   修改描述-2021/4/28:         
-------------------------------------------------
"
""

# 鉴于异步IO模型while True不停扫描高消耗CPU,所以采用IO多路复用模型
import select
import socket
#
# sk = socket.socket()
# addr = ('127.0.0.1', 8090)
# sk.setblocking(False)
# sk.bind(addr)
# sk.listen()

import socket
# 明确配置变量
ip_port = ('127.0.0.1',8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# server.setblocking(True)#默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的
server.setblocking(False) #默认是True  (如果是False,套接字里的一些阻塞操作都变成非阻塞的)
# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)
# 最多可以连接多少个客户端
server.listen(listen_num)


# 客户端链接对象的列表
cilent_conn_list = []
# 待删除的客户端链接列表
cilent_del_list = []
# 这种程序虽说解决了单线程并发,但是大大的占用了cpu

read_lst = [server,]
while True:
    # select 监听请求对象,如果没有收到请求,则阻塞,此处相当于监听accept事件
    rret, wret, xret = select.select(read_lst, [], [])  # rret为响应事件对象,若响应accept事件,则返回server,若响应recv,则返回conn
    # print(rret)
    for item in rret:
        if item is server:
            conn, addr = item.accept()
            print("接收来自客户端:", addr, '的数据')
            read_lst.append(conn)
        else:
            try:
                data = item.recv(buffer_size)
                if not data:
                    read_lst.remove(item)
                    item.close()
                    continue
                try:
                    # print(line.decode('utf8'))
                    # line.decode('utf8')
                    # 如果发送的编码格式是  c.send(msg.encode())的话使用ut8进行解码
                    # 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
                    print(data.decode('utf-8'))
                except:
                    # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
                    print(data.decode('gbk'))
                item.send(data.upper())
            except ConnectionResetError:

                read_lst.remove(item)

结果:

select监听fd变化的过程分析:

  • 用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表
  • 内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
  • 用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,
  • 同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了

select模块的优点

  • 相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,
  • 能够为多客户端提供服务。
  • 如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

select模块的缺点:

  • select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll
  • 如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。

PS:不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现 具有较好跨平台能力的服务器会比较困难。

  • 此模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

select、poll、epoll三种多路复用IO简单总结

一个来自epoll示例(还没进行验证实践因为懒的跑linux上了)

#_*_coding:utf-8_*_
__author__ = 'Alex Li'

import socket, logging
import select, errno

logger = logging.getLogger("network-server")

def InitLog():
    logger.setLevel(logging.DEBUG)

    fh = logging.FileHandler("network-server.log")
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)


if __name__ == "__main__":
    InitLog()

    try:
        # 创建 TCP socket 作为监听 socket
        listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    except socket.error as  msg:
        logger.error("create socket failed")

    try:
        # 设置 SO_REUSEADDR 选项
        listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    except socket.error as  msg:
        logger.error("setsocketopt SO_REUSEADDR failed")

    try:
        # 进行 bind -- 此处未指定 ip 地址,即 bind 了全部网卡 ip 上
        listen_fd.bind(('', 2003))
    except socket.error as  msg:
        logger.error("bind failed")

    try:
        # 设置 listen 的 backlog 数
        listen_fd.listen(10)
    except socket.error as  msg:
        logger.error(msg)

    try:
        # 创建 epoll 句柄
        epoll_fd = select.epoll()
        # 向 epoll 句柄中注册 监听 socket 的 可读 事件
        epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
    except select.error as  msg:
        logger.error(msg)

    connections = {}
    addresses = {}
    datalist = {}
    while True:
        # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
        epoll_list = epoll_fd.poll()

        for fd, events in epoll_list:
            # 若为监听 fd 被激活
            if fd == listen_fd.fileno():
                # 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄
                conn, addr = listen_fd.accept()
                logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 将连接 socket 设置为 非阻塞
                conn.setblocking(0)
                # 向 epoll 句柄中注册 连接 socket 的 可读 事件
                epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
                # 将 conn 和 addr 信息分别保存起来
                connections[conn.fileno()] = conn
                addresses[conn.fileno()] = addr
            elif select.EPOLLIN & events:
                # 有 可读 事件激活
                datas = ''
                while True:
                    try:
                        # 从激活 fd 上 recv 10 字节数据
                        data = connections[fd].recv(10)
                        # 若当前没有接收到数据,并且之前的累计数据也没有
                        if not data and not datas:
                            # 从 epoll 句柄中移除该 连接 fd
                            epoll_fd.unregister(fd)
                            # server 侧主动关闭该 连接 fd
                            connections[fd].close()
                            logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
                            break
                        else:
                            # 将接收到的数据拼接保存在 datas 中
                            datas += data
                    except socket.error as  msg:
                        # 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况
                        # 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理
                        if msg.errno == errno.EAGAIN:
                            logger.debug("%s receive %s" % (fd, datas))
                            # 将已接收数据保存起来
                            datalist[fd] = datas
                            # 更新 epoll 句柄中连接d 注册事件为 可写
                            epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
                            break
                        else:
                            # 出错处理
                            epoll_fd.unregister(fd)
                            connections[fd].close()
                            logger.error(msg)
                            break
            elif select.EPOLLHUP & events:
                # 有 HUP 事件激活
                epoll_fd.unregister(fd)
                connections[fd].close()
                logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1]))
            elif select.EPOLLOUT & events:
                # 有 可写 事件激活
                sendLen = 0
                # 通过 while 循环确保将 buf 中的数据全部发送出去
                while True:
                    # 将之前收到的数据发回 client -- 通过 sendLen 来控制发送位置
                    sendLen += connections[fd].send(datalist[fd][sendLen:])
                    # 在全部发送完毕后退出 while 循环
                    if sendLen == len(datalist[fd]):
                        break
                # 更新 epoll 句柄中连接 fd 注册事件为 可读
                epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
            else:
                # 其他 epoll 事件不进行处理
                continue

高级且高效的I/O复用selectors模块示例:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     __init__.py
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/28
-------------------------------------------------
   修改描述-2021/4/28:         
-------------------------------------------------
"
""
import selectors
import socket

sel = selectors.DefaultSelector()


def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('接收来自客户端链接', conn, 'from', addr,mask)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read#新连接注册read回调函数


def read(conn, mask):
    data = conn.recv(buffer_size)  # Should be ready
    if data:
        # print('回复数据', repr(data), 'to', conn)
        try:
            # print(line.decode('utf8'))
            # line.decode('utf8')
            # 如果发送的编码格式是  c.send(msg.encode())的话使用ut8进行解码
            # 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
            print('收到的数据',data.decode('utf-8'),"来自",conn)
        except:
            # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
            print('收到的数据',data.decode('gbk'),"来自",conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


# sock = socket.socket()

import socket
# 明确配置变量
ip_port = ('127.0.0.1',8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# server.setblocking(True)#默认是True
# server.setblocking(False) #False的话就成非阻塞了,这只是对于socket套接字来说的
server.setblocking(False) #默认是True  (如果是False,套接字里的一些阻塞操作都变成非阻塞的)
# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)
# 最多可以连接多少个客户端
server.listen(listen_num)

# sock = socket.socket()
# sock.bind(('localhost', 9999))
# sock.listen(100)
# sock.setblocking(False)
sel.register(server, selectors.EVENT_READ, accept)

while True:
    events = sel.select() #默认阻塞,有活动连接就返回活动的连接列表
    for key, mask in events:
        callback = key.data #accept
        callback(key.fileobj, mask) #key.fileobj=  文件句柄

gevent实现单线程下的多socket并发示例:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     serve
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/4/28
-------------------------------------------------
   修改描述-2021/4/28:         
-------------------------------------------------
"
""

import sys
import socket
import time
import gevent
from gevent import socket, monkey

monkey.patch_all()

import socket
# 明确配置变量
ip_port = ('127.0.0.1',8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024

# 对socket的配置重用ip和端口号

def server(port):
    # s = socket.socket()
    # 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # server.bind(('0.0.0.0', port))
    server.bind(('127.0.0.1', port))
    server.listen(listen_num)
    while True:
        conn, addr = server.accept()
        print("接收来自客户端:", addr, '的数据')
        gevent.spawn(handle_request, conn)

def handle_request(conn):
    try:
        while True:
            data = conn.recv(buffer_size)
            try:
                # print(line.decode('utf8'))
                # line.decode('utf8')
                # 如果发送的编码格式是  c.send(msg.encode())的话使用ut8进行解码
                # 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
                # print(data.decode('utf-8'))
                # print(data.decode('gbk'))
                print('收到的数据', data.decode('gbk'), "来自", conn)
            except:
                # 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
                print('收到的数据', data.decode('utf-8'), "来自", conn)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(8000)


TCP长连接保持存活的机制

已多线程阻塞IO为示例:

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : 1示例
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/4/27
-------------------------------------------------
修改描述-2021/4/27:
-------------------------------------------------
"""
import socket
from threading import Thread

# 明确配置变量
ip_port = ('127.0.0.1', 8000)
# 最多可以连接多少个客户端
listen_num = 5
# 接受套接字的大小
buffer_size = 1024
# 套接字类型AF_INET, socket.SOCK_STREAM tcp协议,基于流式的协议
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 对socket的配置重用ip和端口号

# 保护IP头部
server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
# 设置混杂模式
server.ioctl(socket.SIO_KEEPALIVE_VALS,
(1, # 开启保活机制
60 * 1000, # 1分钟后如果对象还没反应,则开始探测连接是否存在
10 * 1000 # 10 秒钟探测一次,默认弹窗10次,失败则断开
)
)

# 绑定端口号,写哪个ip就要运行在哪台机器上
server.bind(ip_port)

# 最多可以连接多少个客户端
server.listen(listen_num)


def handle_communicate(conn):
while True:
try:
# IO操作-接受套接字的大小
print("接收来自客户端:", addr, '的数据')
# 在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
data = conn.recv(buffer_size)

if len(data) == 0: break

try:
# print(line.decode('utf8'))
# line.decode('utf8')
# 如果发送的编码格式是 c.send(msg.encode())的话使用ut8进行解码
# 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
print(data.decode('utf-8'))
except:
# 使用网络调试助手发送的数据--- 发送的是ASCII客户端的形式
print(data.decode('gbk'))
conn.send(data.upper())
except ConnectionResetError as e:
# 链接错误
break
conn.close()


while True:
# #IO操作 在这accept的时候不能干recv的活-在这个位置进行等待,监听端口号
conn, addr = server.accept()
t = Thread(target=handle_communicate, args=(conn,))
t.start()
# 关闭服务器
server.close()



关键代码:

参考资料:

1: https://www.cnblogs.com/sunlong88/p/9485926.html

2: UNIX网络编程 卷1:套接字联网API

个人其他博客地址

简书:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

小钟同学 | 文 【原创】| QQ:308711822

  • 1:本文相关描述主要是个人的认知和见解,如有不当之处,还望各位大佬指正。
  • 2:关于文章内容,有部分内容参考自互联网,如有链接会声明标注;如没有及时标注备注的链接的,如有侵权请联系,我会立即删除处理哟。


文章转载自小儿来一壶枸杞酒泡茶,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论