Python 基于队列实现 tcp socket 连接池的方法

连接池实现

socket_pool.py

# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import queue, empty
_logger = logging.getlogger('mylogger')
class socketpool:
    def __init__(self, host, port, min_connections=10, max_connections=10):
        '''
        初始化socket连接池
        :param host: 目标主机地址
        :param port: 目标端口号
        :param min_connections: 最小连接数
        :param max_connections: 最大连接数
        '''
        self.host = host
        self.port = port
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
        self._sock_lock = threading.lock()  # 线程锁保证计数正确
        self._pool = queue(max_connections)  # 基于线程安全的队列存储连接
        self._lock = threading.lock()        # 线程锁保证资源安全:
        self._init_pool()                    # 预创建连接
        self._start_health_check()           # 启动连接健康检查线程
    def _init_pool(self):
        '''预创建连接并填充到池中'''
        for _ in range(self.min_connections):
            sock = self._create_socket()
            self._pool.put(sock)
    def _create_socket(self):
        '''创建新的socket连接'''
        sock = socket.socket(socket.af_inet, socket.sock_stream)
        try:
            sock.connect((self.host, self.port))
            return sock
        except socket.error as e:
            raise connectionerror(f'failed to connect: {e}')  # 连接失败抛出异常
    def _start_health_check(self):
        '''启动后台线程定期检查连接有效性'''
        def check():
            while true:
                with self._lock:
                    for _ in range(self._pool.qsize()):
                        sock = self._pool.get()
                        self.busy_sockets_dict[sock] = 1
                        try:
                            sock.send(b'ping')  # 发送心跳包验证连接状态
                            # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
                            sock.recv(11)
                            self._pool.put(sock)
                            self.busy_sockets_dict.pop(sock)
                        except (socket.error, connectionreseterror):
                            _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
                            sock.close()  # 关闭失效连接并创建新连接替换
                            self.busy_sockets_dict.pop(sock)
                            new_sock = self._create_socket()
                            self._pool.put(new_sock)
                    # 如果sock数量小于最小数量,则补充
                    for _ in range(0, self.min_connections - self._pool.qsize()):
                        new_sock = self._create_socket()
                        self._pool.put(new_sock)
                time.sleep(60)  # 每60秒检查一次
        threading.thread(target=check, daemon=true).start()
    def get_connection(self):
        '''
        从池中获取一个可用连接
        :return: socket对象
        '''
        with self._sock_lock:
            if self._pool.empty():
                if len(self.busy_sockets_dict.keys()) < self.max_connections:
                    new_sock = self._create_socket()
                    self.busy_sockets_dict[new_sock] = 1
                    return new_sock
                else:
                    raise empty('no available connections in pool')
            else:
                try:
                    sock = self._pool.get(block=false)
                    self.busy_sockets_dict[sock] = 1
                    return sock
                except exception:
                    _logger.error('获取socket连接出错:%s' % traceback.format_exc())
                    raise
    def release_connection(self, sock):
        '''
        将连接归还到池中
        :param sock: 待归还的socket对象
        '''
        if not sock._closed:
            self._pool.put(sock)
        if sock in self.busy_sockets_dict:
            self.busy_sockets_dict.pop(sock)
    def close_all(self):
        '''关闭池中所有连接'''
        while not self._pool.empty():
            sock = self._pool.get()
            sock.close()
            self.busy_sockets_dict.pop(sock.id)
        self.busy_sockets_dict = {} # 兜底
host = os.environ.get('modbus_tcp_server_host', '127.0.0.1')
port = int(os.environ.get('modbus_tcp_server_port', '9000'))
min_connections = int(os.environ.get('django_socket_pool_max_connections', '10'))
max_connections = int(os.environ.get('django_socket_pool_max_connections', '100'))
socketpool = socketpool(host, port, min_connections, max_connections)

使用连接池

from socket_pool import socketpool
def send_socket_msg(data):
    global socketpool
    try:
        sock = none
        # 获取连接(支持超时控制)
        sock = socketpool.get_connection()
        # 发送数据
        sock.sendall(data.encode('utf-8'))
    except exception:
        error_msg = '发送消息出错:%s' % traceback.format_exc()
        _logger.error(error_msg)
        if sock is not none:
            sock.close()
            socketpool.release_connection(sock)
        return send_socket_msg(data)
    response = ''
    try:
        while true:
            chunk = sock.recv(4096)
            chunk = chunk.decode('utf-8')
            response += chunk
            if response.endswith(''):
                response = response.rstrip('')
                return {'success':true, 'message':response}
    except exception:
        error_msg = '获取消息出错:%s' % traceback.format_exc()
        _logger.error(error_msg)
        return {'success':false, 'message': error_msg}
    finally:
        # 必须归还连接!
        socketpool.release_connection(sock)

到此这篇关于python 基于队列实现 tcp socket 连接池的文章就介绍到这了,更多相关python tcp socket 连接池内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

发布于 2025-05-07 22:20:38
分享
海报
173
上一篇:SpringBoot配置文件之properties和yml的使用 下一篇:一文带你搞懂Redis Stream的6种消息处理模式
目录

    推荐阅读

    忘记密码?

    图形验证码