多年一直从事数据相关工作。对数据开发存在的各种问题深有体会。数据处理工作主要有: 爬虫,ETL,机器学习。开发过程是构建数据处理的管道Pipeline的过程.  将各种模块拼接起来。总结步骤有:获取数据,转化,合并,存储,发送。数据研发工作和业务系统研发有着很多的差别。数据项目更多是铺管道过程,各模块通过数据依赖,而业务系统开发是建大楼过程。很多情况爬虫工程师,算法工程师,写出来的数据处理代码,非常混乱。因为在看到真实数据前,无法做准确的设计,更不用说性能上的要求。  前段时间花了大量时间对Asyncio库深入研究。决定开发了数据驱动框架,从模块化,灵活度,性能方面来解决数据处理工作的问题。这就我创立Databot开源框架的初衷。

花大半个月时间框架基本完成,能够解决处理数据处理工作,爬虫,ETL,量化交易。并有非常好的性能表现。欢迎大家使用和提意见。

项目地址:https://github.com/kkyon/databot

安装方法:pip3 install -U databot

代码案例:https://github.com/kkyon/databot/tree/master/examples 

多线程 VS 异步协程:

总的来说高并发的数据IO使用异步协程更具有优势。因为线程占用资源多,线程切换时候代价很大,所以建议的线程数都是cpu*2. Python由于GIL限制,通过多线程很难提升性能。

而通过asyncio可以达到非常的吞吐量。并发数几乎没有限制。

具体可以参考这篇文章:

https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

在普通笔记本上 python asyncio 在9分钟 完成100万个网页请求。

 

Databot性能测试结果:

使用百度爬虫案例来作出:

有一批关键词,需要在百度搜索引擎。记录前十页的文章标题。在SEO,舆情等场景经常要做类似事情。测试中使用了100个关键字(需要抓取1000个网页)大概三分钟就能完成。测试环境结果如下:

# ---run result----
HTTP返回在1秒左右
#post man test result for a page requrest ;1100ms

ping的是时间42ms
# PING www.a.shifen.com (180.97.33.108): 56 data bytes
# 64 bytes from 180.97.33.108: icmp_seq=0 ttl=55 time=41.159 ms

Databot测试结果:每秒能抓取50个条目,每秒能处理6个网页。

# got len item 9274 speed:52.994286 per second,total cost: 175s
# got len item 9543 speed:53.016667 per second,total cost: 180s
# got len item 9614 speed:51.967568 per second,total cost: 185s




 

 

Python Asyncio 的问题:

asyncio本身,比如概念复杂,futrue,task,区别,ensure futer,crate_task。

协程编写要求对工程师高,特别在数据项目中。

asyncio支持的三方库有限,需要结合多线程和多进程来开发。

 

Databot理念和

数据工程师只关注核心逻辑,编写模块化函数,不需要考虑asyncio的特性。Databot将处理外部IO,并发,调度问题。

 

Databot基本概念:

Databot设计非常简洁,一共只有三个概念:Pipe,Route,Node

Pipe是主流程,一个程序可以有多个Pipe,相互联系或独立。Route,Node,都是包含在pipe内部。

Route是路由器,主要起数据路由,汇总合并作用。有Branch, Return,Fork,Join,BlockedJoin。其中Branch,Fork,不会改变主流程数据。Return,Join,会将处理后的数据放回到主流程中。可以通过嵌套Route,组合出复杂的数据网络。

Node是数据驱动节点。 处理数据逻辑节点,一些HTTP,Mysql,AioFile ,客户自定义函数,Timer,Loop都是属于Node.

 

 

如何安装Databot:

pip3 install -U databot

 

github地址:https://github.com/kkyon/databot

 

爬虫代码解析:

更多例子参照:https://github.com/kkyon/databot/tree/master/examples

 

针对百度爬虫例子,主流程代码如下:

get_all_items,是客户编写函数用于解析网页上的条目。
get_all_page_url 是自定义编写函数用于获取网页上的翻页链接。

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “.PingFang SC”; color: #454545 }
p.p2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “Helvetica Neue”; color: #454545 }
li.li1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “.PingFang SC”; color: #454545 }
li.li2 { margin: 0.0px 0.0px 0.0px 0.0px; font: 12.0px “Helvetica Neue”; color: #454545 }
span.s1 { font: 12.0px “Helvetica Neue” }
span.s2 { font: 12.0px “.PingFang SC” }
ol.ol1 { list-style-type: decimal }

  1. Loop通过循环列表把,链接发送到pipe中。
  2. HTTPLoader将读入URL,下载HTML.生成HTTP response对象放入Pipe中
  3. Branch会拷贝一份数据(Httpresponse)导入分支中,然后get_all_items会解析成最终结果,存入文件中。此时主流程数据不受影响。仍然有一份HTTP response
  4. Branch拷贝pipe中的Httpresponse到分支,然后通过get_all_page_url解析全部翻页链接。然后通过HTTPloader下载相应的网页,解析保持。

以上每个步骤都会通过Databot框架调用和并发。

BotFrame.render('baiduspider')函数可以用于生产pipe的结构图。需要安装https://www.graphviz.org/download/

主函数代码:
 
 1 def main():
 2     words = ['贸易战', '世界杯']
 3     baidu_url = 'https://www.baidu.com/s?wd=%s'
 4     urls = [baidu_url % (word) for word in words]
 5 
 6 
 7     outputfile=aiofile('baidu.txt')
 8     Pipe(
 9         Loop(urls),
10         HttpLoader(),
11         Branch(get_all_items,outputfile),
12         Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile),
13 
14     )
15 
16     #生成流程图
17     BotFrame.render('baiduspider')
18     BotFrame.run()
19 
20 
21 main()

 

下列是生成的流程图

 

 

全部代码:

 1 from databot.flow import Pipe, Branch, Loop
 2 from databot.botframe import BotFrame
 3 from bs4 import BeautifulSoup
 4 from databot.http.http import HttpLoader
 5 from databot.db.aiofile import aiofile
 6 import logging
 7 logging.basicConfig(level=logging.DEBUG)
 8 
 9 
10 
11 #定义解析结构
12 class ResultItem:
13 
14     def __init__(self):
15         self.id: str = ''
16         self.name: str = ''
17         self.url: str = ' '
18         self.page_rank: int = 0
19         self.page_no: int = 0
20 
21     def __repr__(self):
22         return  '%s,%s,%d,%d'%(str(self.id),self.name,self.page_no,self.page_rank)
23 
24 
25 # 解析具体条目
26 def get_all_items(response):
27     soup = BeautifulSoup(response.text, "lxml")
28     items = soup.select('div.result.c-container')
29     result = []
30     for rank, item in enumerate(items):
31         import uuid
32         id = uuid.uuid4()
33         r = ResultItem()
34         r.id = id
35         r.page_rank = rank
36         r.name = item.h3.get_text()
37         result.append(r)
38     return result
39 
40 
41 # 解析分页链接
42 def get_all_page_url(response):
43     itemList = []
44     soup = BeautifulSoup(response.text, "lxml")
45     page = soup.select('div#page')
46     for item in page[0].find_all('a'):
47         href = item.get('href')
48         no = item.get_text()
49         if '下一页' in no:
50             break
51         itemList.append('https://www.baidu.com' + href)
52 
53     return itemList
54 
55 
56 def main():
57     words = ['贸易战', '世界杯']
58     baidu_url = 'https://www.baidu.com/s?wd=%s'
59     urls = [baidu_url % (word) for word in words]
60 
61 
62     outputfile=aiofile('baidu.txt')
63     Pipe(
64         Loop(urls),
65         HttpLoader(),
66         Branch(get_all_items,outputfile),
67         Branch(get_all_page_url, HttpLoader(), get_all_items, outputfile),
68 
69     )
70     #生成流程图
71     BotFrame.render('baiduspider')
72     BotFrame.run()
73 
74 
75 main()

 

版权声明:本文为codemind原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/codemind/p/9535491.html