Python 基于队列实现 tcp socket 连接池的方法
连接池实现
socket_pool.py
# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import queue, empty
_logger = logging.getlogger('mylogger')
class socketpool:
def __init__(self, host, port, min_connections=10, max_connections=10):
'''
初始化socket连接池
:param host: 目标主机地址
:param port: 目标端口号
:param min_connections: 最小连接数
:param max_connections: 最大连接数
'''
self.host = host
self.port = port
self.min_connections = min_connections
self.max_connections = max_connections
self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
self._sock_lock = threading.lock() # 线程锁保证计数正确
self._pool = queue(max_connections) # 基于线程安全的队列存储连接
self._lock = threading.lock() # 线程锁保证资源安全:
self._init_pool() # 预创建连接
self._start_health_check() # 启动连接健康检查线程
def _init_pool(self):
'''预创建连接并填充到池中'''
for _ in range(self.min_connections):
sock = self._create_socket()
self._pool.put(sock)
def _create_socket(self):
'''创建新的socket连接'''
sock = socket.socket(socket.af_inet, socket.sock_stream)
try:
sock.connect((self.host, self.port))
return sock
except socket.error as e:
raise connectionerror(f'failed to connect: {e}') # 连接失败抛出异常
def _start_health_check(self):
'''启动后台线程定期检查连接有效性'''
def check():
while true:
with self._lock:
for _ in range(self._pool.qsize()):
sock = self._pool.get()
self.busy_sockets_dict[sock] = 1
try:
sock.send(b'ping') # 发送心跳包验证连接状态
# 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
sock.recv(11)
self._pool.put(sock)
self.busy_sockets_dict.pop(sock)
except (socket.error, connectionreseterror):
_logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
sock.close() # 关闭失效连接并创建新连接替换
self.busy_sockets_dict.pop(sock)
new_sock = self._create_socket()
self._pool.put(new_sock)
# 如果sock数量小于最小数量,则补充
for _ in range(0, self.min_connections - self._pool.qsize()):
new_sock = self._create_socket()
self._pool.put(new_sock)
time.sleep(60) # 每60秒检查一次
threading.thread(target=check, daemon=true).start()
def get_connection(self):
'''
从池中获取一个可用连接
:return: socket对象
'''
with self._sock_lock:
if self._pool.empty():
if len(self.busy_sockets_dict.keys()) < self.max_connections:
new_sock = self._create_socket()
self.busy_sockets_dict[new_sock] = 1
return new_sock
else:
raise empty('no available connections in pool')
else:
try:
sock = self._pool.get(block=false)
self.busy_sockets_dict[sock] = 1
return sock
except exception:
_logger.error('获取socket连接出错:%s' % traceback.format_exc())
raise
def release_connection(self, sock):
'''
将连接归还到池中
:param sock: 待归还的socket对象
'''
if not sock._closed:
self._pool.put(sock)
if sock in self.busy_sockets_dict:
self.busy_sockets_dict.pop(sock)
def close_all(self):
'''关闭池中所有连接'''
while not self._pool.empty():
sock = self._pool.get()
sock.close()
self.busy_sockets_dict.pop(sock.id)
self.busy_sockets_dict = {} # 兜底
host = os.environ.get('modbus_tcp_server_host', '127.0.0.1')
port = int(os.environ.get('modbus_tcp_server_port', '9000'))
min_connections = int(os.environ.get('django_socket_pool_max_connections', '10'))
max_connections = int(os.environ.get('django_socket_pool_max_connections', '100'))
socketpool = socketpool(host, port, min_connections, max_connections)
使用连接池
from socket_pool import socketpool
def send_socket_msg(data):
global socketpool
try:
sock = none
# 获取连接(支持超时控制)
sock = socketpool.get_connection()
# 发送数据
sock.sendall(data.encode('utf-8'))
except exception:
error_msg = '发送消息出错:%s' % traceback.format_exc()
_logger.error(error_msg)
if sock is not none:
sock.close()
socketpool.release_connection(sock)
return send_socket_msg(data)
response = ''
try:
while true:
chunk = sock.recv(4096)
chunk = chunk.decode('utf-8')
response += chunk
if response.endswith(''):
response = response.rstrip('')
return {'success':true, 'message':response}
except exception:
error_msg = '获取消息出错:%s' % traceback.format_exc()
_logger.error(error_msg)
return {'success':false, 'message': error_msg}
finally:
# 必须归还连接!
socketpool.release_connection(sock)
到此这篇关于python 基于队列实现 tcp socket 连接池的文章就介绍到这了,更多相关python tcp socket 连接池内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
推荐阅读
-
一文教你Python如何快速精准抓取网页数据
本文将使用requests和beautifulsoup这两个流行的库来实现。1.准备工作首先安装必要的库:pipinst...
-
使用Python实现IP地址和端口状态检测与监控
-
基于Python打造一个智能单词管理神器
-
Python实现微信自动锁定工具
-
使用Python创建一个功能完整的Windows风格计算器程序
python实现windows系统计算器程序(含高级功能)下面我将介绍如何使用python创建一个功能完整的windows风格计...
-
Python开发文字版随机事件游戏的项目实例
随机事件游戏是一种通过生成不可预测的事件来增强游戏体验的类型。在这类游戏中,玩家必须应对随机发生的情况,这些情况可能会影响他们的资...
-
使用Pandas实现Excel中的数据透视表的项目实践
引言在数据分析中,数据透视表是一种非常强大的工具,它可以帮助我们快速汇总、分析和可视化大量数据。虽然excel提供了内置的数据透...
-
Pandas利用主表更新子表指定列小技巧
一、前言工作的小技巧,利用pandas读取主表和子表,利用主表的指定列,更新子表的指定列。案例:主表:uidname0...
-
Pandas中统计汇总可视化函数plot()的使用
-
Python中tensorflow的argmax()函数的使用小结
在tensorflow中,argmax()函数是一个非常重要的操作,它用于返回给定张量(tensor)沿指定轴的最大值的索引。这个...
