开源项目推荐 Databot: Python高性能数据驱动开发框架--爬虫案例
多年一直从事数据相关工作。对数据开发存在的各种问题深有体会。数据处理工作主要有: 爬虫,ETL,机器学习。开发过程是构建数据处理的管道Pipeline的过程. 将各种模块拼接起来。总结步骤有:获取数据,转化,合并,存储,发送。数据研发工作和业务系统研发有着很多的差别。数据项目更多是铺管道过程,各模块通过数据依赖,而业务系统开发是建大楼过程。很多情况爬虫工程师,算法工程师,写出来的数据处理代码,非常混乱。因为在看到真实数据前,无法做准确的设计,更不用说性能上的要求。 前段时间花了大量时间对Asyncio库深入研究。决定开发了数据驱动框架,从模块化,灵活度,性能方面来解决数据处理工作的问题。这就我创立Databot开源框架的初衷。
花大半个月时间框架基本完成,能够解决处理数据处理工作,爬虫,ETL,量化交易。并有非常好的性能表现。欢迎大家使用和提意见。
项目地址:https://github.com/kkyon/databot
安装方法:pip3 install -U databot
代码案例:https://github.com/kkyon/databot/tree/master/examples
多线程 VS 异步协程:
总的来说高并发的数据IO使用异步协程更具有优势。因为线程占用资源多,线程切换时候代价很大,所以建议的线程数都是cpu*2. Python由于GIL限制,通过多线程很难提升性能。
而通过asyncio可以达到非常的吞吐量。并发数几乎没有限制。
具体可以参考这篇文章:
https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html
在普通笔记本上 python asyncio 在9分钟 完成100万个网页请求。
Databot性能测试结果:
使用百度爬虫案例来作出:
有一批关键词,需要在百度搜索引擎。记录前十页的文章标题。在SEO,舆情等场景经常要做类似事情。测试中使用了100个关键字(需要抓取1000个网页)大概三分钟就能完成。测试环境结果如下:
# ---run result----
HTTP返回在1秒左右
#post man test result for a page requrest ;1100ms
ping的是时间42ms
# PING www.a.shifen.com (180.97.33.108): 56 data bytes
# 64 bytes from 180.97.33.108: icmp_seq=0 ttl=55 time=41.159 ms
Databot测试结果:每秒能抓取50个条目,每秒能处理6个网页。
# got len item 9274 speed:52.994286 per second,total cost: 175s
# got len item 9543 speed:53.016667 per second,total cost: 180s
# got len item 9614 speed:51.967568 per second,total cost: 185s
Python Asyncio 的问题:
asyncio本身,比如概念复杂,futrue,task,区别,ensure futer,crate_task。
协程编写要求对工程师高,特别在数据项目中。
asyncio支持的三方库有限,需要结合多线程和多进程来开发。
Databot理念和
数据工程师只关注核心逻辑,编写模块化函数,不需要考虑asyncio的特性。Databot将处理外部IO,并发,调度问题。
Databot基本概念:
Databot设计非常简洁,一共只有三个概念:Pipe,Route,Node
Pipe是主流程,一个程序可以有多个Pipe,相互联系或独立。Route,Node,都是包含在pipe内部。
Route是路由器,主要起数据路由,汇总合并作用。有Branch, Return,Fork,Join,BlockedJoin。其中Branch,Fork,不会改变主流程数据。Return,Join,会将处理后的数据放回到主流程中。可以通过嵌套Route,组合出复杂的数据网络。
Node是数据驱动节点。 处理数据逻辑节点,一些HTTP,Mysql,AioFile ,客户自定义函数,Timer,Loop都是属于Node.
如何安装Databot:
pip3 install -U databot
github地址:https://github.com/kkyon/databot
爬虫代码解析:
更多例子参照:https://github.com/kkyon/databot/tree/master/examples
针对百度爬虫例子,主流程代码如下:
get_all_items,是客户编写函数用于解析网页上的条目。
get_all_page_url 是自定义编写函数用于获取网页上的翻页链接。
p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “.PingFang SC”; color: #454545 }
p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “Helvetica Neue”; color: #454545 }
li.li1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “.PingFang SC”; color: #454545 }
li.li2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “Helvetica Neue”; color: #454545 }
span.s1 { font: 12.0px “Helvetica Neue” }
span.s2 { font: 12.0px “.PingFang SC” }
ol.ol1 { list-style-type: decimal }
- Loop通过循环列表把,链接发送到pipe中。
- HTTPLoader将读入URL,下载HTML.生成HTTP response对象放入Pipe中
- Branch会拷贝一份数据(Httpresponse)导入分支中,然后get_all_items会解析成最终结果,存入文件中。此时主流程数据不受影响。仍然有一份HTTP response
- Branch拷贝pipe中的Httpresponse到分支,然后通过get_all_page_url解析全部翻页链接。然后通过HTTPloader下载相应的网页,解析保持。
以上每个步骤都会通过Databot框架调用和并发。
BotFrame.render('baiduspider')函数可以用于生产pipe的结构图。需要安装https://www.graphviz.org/download/
主函数代码:
1 def main(): 2 words = ['贸易战', '世界杯'] 3 baidu_url = 'https://www.baidu.com/s?wd=%s' 4 urls = [baidu_url % (word) for word in words] 5 6 7 outputfile=aiofile('baidu.txt') 8 Pipe( 9 Loop(urls), 10 HttpLoader(), 11 Branch(get_all_items,outputfile), 12 Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile), 13 14 ) 15 16 #生成流程图 17 BotFrame.render('baiduspider') 18 BotFrame.run() 19 20 21 main()
下列是生成的流程图
全部代码:
1 from databot.flow import Pipe, Branch, Loop 2 from databot.botframe import BotFrame 3 from bs4 import BeautifulSoup 4 from databot.http.http import HttpLoader 5 from databot.db.aiofile import aiofile 6 import logging 7 logging.basicConfig(level=logging.DEBUG) 8 9 10 11 #定义解析结构 12 class ResultItem: 13 14 def __init__(self): 15 self.id: str = '' 16 self.name: str = '' 17 self.url: str = ' ' 18 self.page_rank: int = 0 19 self.page_no: int = 0 20 21 def __repr__(self): 22 return '%s,%s,%d,%d'%(str(self.id),self.name,self.page_no,self.page_rank) 23 24 25 # 解析具体条目 26 def get_all_items(response): 27 soup = BeautifulSoup(response.text, "lxml") 28 items = soup.select('div.result.c-container') 29 result = [] 30 for rank, item in enumerate(items): 31 import uuid 32 id = uuid.uuid4() 33 r = ResultItem() 34 r.id = id 35 r.page_rank = rank 36 r.name = item.h3.get_text() 37 result.append(r) 38 return result 39 40 41 # 解析分页链接 42 def get_all_page_url(response): 43 itemList = [] 44 soup = BeautifulSoup(response.text, "lxml") 45 page = soup.select('div#page') 46 for item in page[0].find_all('a'): 47 href = item.get('href') 48 no = item.get_text() 49 if '下一页' in no: 50 break 51 itemList.append('https://www.baidu.com' + href) 52 53 return itemList 54 55 56 def main(): 57 words = ['贸易战', '世界杯'] 58 baidu_url = 'https://www.baidu.com/s?wd=%s' 59 urls = [baidu_url % (word) for word in words] 60 61 62 outputfile=aiofile('baidu.txt') 63 Pipe( 64 Loop(urls), 65 HttpLoader(), 66 Branch(get_all_items,outputfile), 67 Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile), 68 69 ) 70 #生成流程图 71 BotFrame.render('baiduspider') 72 BotFrame.run() 73 74 75 main()