Python 实战:从零实现 SOCKS5 代理(附完整示例与解析)

从零理解并实现一个简约 SOCKS5 代理(以 Python 为例)

当你在网络工具链里需要一个可控、轻量、可调试的代理时,SOCKS5 是非常合适的选择。它本身协议简单、支持域名解析和多种认证方式,常用于翻墙、流量转发或测试中间人场景。下面以实现思路和一份可运行的 Python 示例为核心,带你从握手到数据转发一步步剖析,实现一个仅支持无认证(NO AUTH)和 CONNECT 命令的 SOCKS5 代理。

为什么从零实现有价值

市面上有很多成熟的 SOCKS5 实现(例如 Shadowsocks、Dante、sslsocks 等),但是自己实现一遍有助于:

  • 彻底理解握手流程、请求格式和返回码含义;
  • 掌握如何在 TCP 层做透明转发与错误处理;
  • 便于扩展鉴权、UDP 转发、异步优化或链式代理功能。

协议要点回顾(实现前必须弄清的部分)

实现 SOCKS5 需要重点掌握以下二者的字节级交互:

1) 握手(Method Negotiation)

  • 客户端发送:VER(0x05), NMETHODS, METHODS[]。
  • 服务端回复:VER(0x05), METHOD(选择的认证方式,比如 0x00 表示无认证)。

2) 请求(Request)

  • 客户端请求格式:VER(0x05), CMD(0x01 CONNECT / 0x02 BIND / 0x03 UDP ASSOCIATE), RSV(0x00), ATYP(0x01 IPv4 / 0x03 Domain / 0x04 IPv6), DST.ADDR, DST.PORT(2字节)。
  • 服务端回复格式:VER(0x05), REP(返回码), RSV(0x00), ATYP, BND.ADDR, BND.PORT。

在实现中通常只支持 CMD=CONNECT 与 ATYP=IPv4/Domain,跳过复杂的 UDP 和 BIND 场景。

实现思路与关键点

一个简洁实现可按以下模块切分:

  • 主 TCP 监听循环:accept 新连接并交给工作线程或协程处理。
  • 握手处理:解析 NMETHODS 与 METHODS,返回 0x00(NO AUTH)或失败。
  • 请求解析:读取 ATYP 决定如何解析目标地址(IPv4/Domain/IPv6),读取端口并连接目标服务器。
  • 回复客户端:在与目标建立连接后,发送 REP=0x00(成功)以及绑定地址信息。
  • 双向转发:在客户端与目标之间转发数据(可用两个线程或非阻塞 I/O/asyncio 实现)。
  • 错误处理与超时:对连接失败、超时、协议错误返回合适的 REP 值并关闭连接。

Python 示例(支持 NO AUTH + CONNECT,基于线程)

#!/usr/bin/env python3
import socket
import threading
import struct

LISTEN_ADDR = '0.0.0.0'
LISTEN_PORT = 1080
BUFFER_SIZE = 4096

def recv_all(sock, n):
    data = b''
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            return None
        data += chunk
    return data

def handle_client(client_sock, client_addr):
    try:
        # 握手
        header = recv_all(client_sock, 2)
        if not header or header[0] != 0x05:
            client_sock.close(); return
        nmethods = header[1]
        methods = recv_all(client_sock, nmethods)
        if methods is None:
            client_sock.close(); return
        # 选择 NO AUTH (0x00) 或者拒绝 (0xff)
        if 0x00 in methods:
            client_sock.sendall(b'x05x00')
        else:
            client_sock.sendall(b'x05xff'); client_sock.close(); return

        # 请求
        req_header = recv_all(client_sock, 4)
        if not req_header or req_header[0] != 0x05:
            client_sock.close(); return
        cmd = req_header[1]
        atyp = req_header[3]

        if cmd != 0x01:  # 仅支持 CONNECT
            client_sock.sendall(b'x05x07x00x01' + b'x00'6)
            client_sock.close(); return

        if atyp == 0x01:  # IPv4
            addr = recv_all(client_sock, 4)
            dst_addr = socket.inet_ntoa(addr)
        elif atyp == 0x03:  # 域名
            dom_len = ord(recv_all(client_sock, 1))
            dom = recv_all(client_sock, dom_len).decode()
            dst_addr = dom
        elif atyp == 0x04:  # IPv6(简单示意)
            addr = recv_all(client_sock, 16)
            dst_addr = socket.inet_ntop(socket.AF_INET6, addr)
        else:
            client_sock.close(); return

        port_bytes = recv_all(client_sock, 2)
        dst_port = struct.unpack('!H', port_bytes)[0]

        # 连接目标
        try:
            remote = socket.create_connection((dst_addr, dst_port), timeout=10)
        except Exception:
            client_sock.sendall(b'x05x05x00x01' + b'x00'6)
            client_sock.close(); return

        # 发送成功应答:这里用 IPv4 0.0.0.0:0 作为 BND.ADDR/BND.PORT 的示例
        reply = b'x05x00x00x01' + socket.inet_aton('0.0.0.0') + struct.pack('!H', 0)
        client_sock.sendall(reply)

        # 双向转发
        def forward(src, dst):
            try:
                while True:
                    data = src.recv(BUFFER_SIZE)
                    if not data:
                        break
                    dst.sendall(data)
            except:
                pass
            finally:
                try: dst.shutdown(socket.SHUT_WR)
                except: pass

        t1 = threading.Thread(target=forward, args=(client_sock, remote))
        t2 = threading.Thread(target=forward, args=(remote, client_sock))
        t1.start(); t2.start()
        t1.join(); t2.join()
    finally:
        client_sock.close()
        try: remote.close()
        except: pass

def main():
    srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind((LISTEN_ADDR, LISTEN_PORT))
    srv.listen(128)
    print('SOCKS5 server listening on %s:%d' % (LISTEN_ADDR, LISTEN_PORT))
    try:
        while True:
            c, addr = srv.accept()
            threading.Thread(target=handle_client, args=(c, addr), daemon=True).start()
    finally:
        srv.close()

if __name__ == '__main__':
    main()

代码要点说明

上面的实现遵循最小可用性原则:

  • 仅支持 SOCKS5、无认证(METHOD=0x00)以及 CONNECT 命令;
  • 解析域名(ATYP=0x03)时,先读长度再读域名;
  • 建立到目标的 TCP 连接后,向客户端返回成功应答;
  • 使用两个线程分别做 client→remote 与 remote→client 的流量转发;
  • 错误时返回合适的 REP(示例中只展示了部分常见返回码)。

常见限制与可行的改进方向

当前示例是教学与调试级别的实现,若用于生产或更复杂场景,建议考虑:

  • 并发模型:用 asyncio 或基于 epoll 的事件循环替代线程,以提高并发性能与资源利用;
  • 认证扩展:实现用户名/密码(METHOD=0x02)或者基于证书的认证;
  • UDP 支持:实现 UDP ASSOCIATE 命令以支持 DNS over UDP 或 UDP-based 应用;
  • 权限与流量控制:白名单、限速、连接速率限制与黑白名单日志;
  • 安全强化:防止未授权滥用(如匿名开放代理),使用 TLS 包装控制连接或放置在内网中接受授权访问。

测试与验证要点

部署后,用常见工具(如 curl、ssh -D 或浏览器配置 SOCKS5)进行验证。注意用抓包(tcpdump/wireshark)检查握手数据包是否符合协议;对域名解析是否由代理端完成(若 ATYP=Domain,应由代理解析)要特别留意。

性能与安全小贴士

在高并发场景下,避免为每个连接启动过多线程;使用线程池或异步 I/O 可以降低上下文切换成本。为防止代理被公开滥用,应默认绑定到内网地址、启用认证或配合防火墙规则限制来源 IP。

通过手写实现,你不仅能掌握 SOCKS5 的协议细节,还能根据实际需求定制行为,比如附加请求日志、注入限速或链式代理等。上述示例适合作为学习与中小规模内网代理的起点,后续可逐步加入更多功能。

© 版权声明
THE END
喜欢就支持一下吧
分享
评论 抢沙发

请登录后发表评论

    暂无评论内容