高并发下数据库连接使用方案:

  1.使用已有数据库连接池(下文有介绍)

  2.每个执行单元建立一次连接独享;(效率超低,难以维护,不建议使用)

  3.手动维护线程池(本人在MAC测试目前效率最高);

实测代码案例:

  1 import time
  2 import pymysql
  3 from dbutils import persistent_db, pooled_db, simple_pooled_db, steady_db
  4 from multiprocessing.pool import ThreadPool
  5 from abc import abstractmethod,ABCMeta
  6 from queue import Queue
  7 # print(help(pooled_db.PooledDB))
  8 # pooled_db 线程安全
  9 # print(pymysql.threadsafety)
 10 
 11 db_conf = {"host": "127.0.0.1",
 12            "port": 3306,
 13            "user": "root",
 14            "password": "123456",
 15            "database": "userinfo"
 16            }
 17 
 18 
 19 class TaskConsumer(metaclass=ABCMeta):
 20     @abstractmethod
 21     def _task_worker(self, i):
 22         pass
 23 
 24     def consumer(self):
 25         task_pool = ThreadPool(self.task_pool_num)
 26         start_time = time.time()
 27         for i in range(self.task_num):
 28             task_pool.apply_async(func=self._task_worker, args=(i,))
 29         task_pool.close()
 30         task_pool.join()
 31         return time.time() - start_time
 32 
 33     def db_read(self, cur, sql, i):
 34         # time.sleep(2)
 35         cur.execute(sql)
 36         print(i, cur.fetchall())
 37 
 38 class DbPoolTest(TaskConsumer):
 39     def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
 40         self.task_num = task_num
 41         self.task_pool_num = task_pool_num
 42         self.max_conn=max_conn
 43         self.blocking = blocking
 44         self.kwargs = db_conf
 45         self._get_db_pool()
 46 
 47     def _get_db_pool(self):
 48         self.db_pool = pooled_db.PooledDB(
 49             creator=pymysql,  # 连接对象或符合DB-API 2的数据库模块(使用链接数据库的模块)
 50             mincached=self.max_conn,  # 池中空闲连接的初始数量(默认值为0表示启动时未建立连接)
 51             maxcached=0,  # 池中最大空闲连接数(默认值0或None表示池大小不受限制)
 52             maxshared=0,
 53             # 允许的最大共享连接数(默认值为0或None表示所有连接都是专用的);达到此最大数目后,如果已将连接请求为可共享的,则将共享它们。因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
 54             maxconnections=self.max_conn,  # 通常允许的最大连接数(默认值为0或无表示任何数量的连接)
 55             blocking=self.blocking,  # 确定超出最大值时的行为(如果将其设置为true,阻塞并等待直到连接减少,但默认情况下将报告错误)
 56             maxusage=None,  # 单个连接的最大重用次数(默认值为0或无表示无限重用);当达到连接的最大使用次数时,该连接将自动重置(关闭并重新打开)
 57             setsession=None,  # 可用于准备会话的SQL命令的可选列表,例如 [“将日期样式设置为德语”,...]
 58             reset=True,  # 将连接返回池时应如何重置(False或None为以begin()开始的回滚事务,出于安全考虑,默认值True始终触发回滚)
 59             failures=None,  # 如果默认(OperationalError,InternalError)不足,则应为其应用连接故障转移机制的可选异常类或异常类的元组
 60             ping=1,
 61             # 可选标志,用于控制何时使用ping()方法检查连接(如果该方法可用)(0 = None = never, 1 = default = whenever it is requested,2 = when a cursor is created, 4 = when a query is executed, 7 = always)
 62             **self.kwargs)
 63 
 64     def _task_worker(self, i):
 65         conn = self.db_pool.connection()
 66         cur = conn.cursor()
 67         sql = "select * from username;"
 68         self.db_read(cur, sql, i)
 69         cur.close()
 70         conn.close()
 71 
 72     def close(self):
 73         self.db_pool.close()
 74 
 75 class PymysqlTest(TaskConsumer):
 76     def __init__(self,task_num, task_pool_num, **db_conf):
 77         self.task_num = task_num
 78         self.task_pool_num = task_pool_num
 79         self.kwargs = db_conf
 80 
 81     def _task_worker(self, i):
 82         conn = pymysql.Connect(**self.kwargs)
 83         cur = conn.cursor()
 84         sql = "select * from username;"
 85         self.db_read(cur, sql, i)
 86         cur.close()
 87         conn.close()
 88 
 89 
 90 class PymysqlPoolTest(TaskConsumer):
 91     def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
 92         self.task_num = task_num
 93         self.task_pool_num = task_pool_num
 94         self.max_conn = max_conn
 95         self.blocking = blocking
 96         self.kwargs = db_conf
 97         self._get_pymysql_pool()
 98 
 99     def _get_pymysql_pool(self):
100         self.queue_pool = Queue(maxsize=self.max_conn)
101         for i in range(self.max_conn):
102             self.queue_pool.put(pymysql.Connect(**self.kwargs))
103 
104     def _task_worker(self, i):
105         conn = self.queue_pool.get()
106         cur = conn.cursor()
107         sql = "select * from username;"
108         self.db_read(cur, sql, i)
109         cur.close()
110         self.queue_pool.put(conn)
111 
112     def close(self):
113         while not self.queue_pool.empty():
114             self.queue_pool.get().close()
115 
116 if __name__ == '__main__':
117     TASK_NUM = 10000
118     TASK_POOL_NUM = 100
119 
120 
121     # 方案1:数据库连接池方案(建议设置blocking=True,maxconnections最大连接数量)
122     start_time_dbpool=time.time()
123     DB_POOL_NUM = 100 #DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变
124     db_pool_test = DbPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM,**db_conf)
125     # start_time_dbpool = time.time()
126     time_dbpool_task = db_pool_test.consumer()
127     db_pool_test.close()
128     time_dbpool = (time.time()-start_time_dbpool, time_dbpool_task)
129 
130 
131     # 方案2:每个线程池中手动建立一个连接
132     start_time_pymysql = time.time()
133     pymysql_test = PymysqlTest(TASK_NUM, TASK_POOL_NUM, **db_conf)
134     time_pymysql_task = pymysql_test.consumer()
135     time_pymysql = (time.time()-start_time_pymysql, time_pymysql_task)
136 
137     # 方案3:手动维护数据库连接池
138     start_time_pymysqlpool = time.time()
139     DB_POOL_NUM = 100  # DB_POOL_NUM>=TASK_POOL_NUM任务执行时间基本保持不变
140     pymysql_pool_test = PymysqlPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM, **db_conf)
141     # start_time_dbpool = time.time()
142     time_pymysqlpool_task = pymysql_pool_test.consumer()
143     pymysql_pool_test.close()
144     time_pymysqlpool = (time.time() - start_time_pymysqlpool, time_pymysqlpool_task)
145 
146 
147     print("数据库连接池方案:", time_dbpool)
148     print("线程内手动连接方案:", time_pymysql)
149     print("手动维护数据库连接池方案:", time_pymysqlpool)

并发中数据库连接的三种方案

 

 

DBUtils实际上是一个包含两个子模块的Python包,一个用于连接DB-API 2模块,另一个用于连接典型的PyGreSQL模块。

全局的DB-API 2变量
SteadyDB.py 用于稳定数据库连接
PooledDB.py 连接池
PersistentDB.py 维持持续的数据库连接
SimplePooledDB.py 简单连接池

 

典型的 PyGreSQL 变量
SteadyPg.py 稳定PyGreSQL连接
PooledPg.py PyGreSQL连接池
PersistentPg.py 维持持续的PyGreSQL连接
SimplePooledPg.py 简单的PyGreSQL连接池

 

SteadyDB

DBUtils.SteadyDB 是一个模块实现了”强硬”的数据库连接,基于DB-API 2建立的原始连接。一个”强硬”的连接意味着在连接关闭之后,或者使用次数操作限制时会重新连接。

一个典型的例子是数据库重启时,而你的程序仍然在运行并需要访问数据库,或者当你的程序连接了一个防火墙后面的远程数据库,而防火墙重启时丢失了状态时。

一般来说你不需要直接使用 SteadyDB 它只是给接下来的两个模块提供基本服务, PersistentDB 和 PooledDB 。

SimplePooledDB

DBUtils.SimplePooledDB 是一个非常简单的数据库连接池实现。他比完善的 PooledDB 模块缺少很多功能

PersistentDB

DBUtils.PersistentDB 实现了强硬的、线程安全的、顽固的数据库连接,使用DB-API 2模块。

当一个线程首次打开一个数据库连接时,一个连接会打开并仅供这个线程使用。当线程关闭连接时,连接仍然持续打开供这个线程下次请求时使用这个已经打开的连接。连接在线程死亡时自动关闭。

简单的来说 PersistentDB 尝试重用数据库连接来提高线程化程序的数据库访问性能,并且他确保连接不会被线程之间共享。

因此, PersistentDB 可以在底层DB-API模块并非线程安全的时候同样工作的很好,并且他会在其他线程改变数据库会话或者使用多语句事务时同样避免问题的发生。

PooledDB

DBUtils.PooledDB 实现了一个强硬的、线程安全的、有缓存的、可复用的数据库连接,使用任何DB-API 2模块。

PooledDB 可以在不同线程之间共享打开的数据库连接。这在你连接并指定 maxshared 参数,并且底层的DB-API 2接口是线程安全才可以,但是你仍然可以使用专用数据库连接而不在线程之间共享连接。除了共享连接以外,还可以设立一个至少 mincached 的连接池,并且最多允许使用 maxcached 个连接,这可以同时用于专用和共享连接池。当一个线程关闭了一个非共享连接,则会返还到空闲连接池中等待下次使用。

如果底层DB-API模块是非线程安全的,线程锁会确保使用 PooledDB 是线程安全的。所以你并不需要为此担心,但是你在使用专用连接来改变数据库会话或执行多命令事务时必须小心。

 

版权声明:本文为open-yang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/open-yang/p/14317632.html