Python学习笔记:PyHive连接Hive数据库
PyHive
是 Python
语言编写的用于操作 Hive
的简便工具库。
一、PyHive安装
# Liunx系统
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
# Windows系统会出现莫名其妙的报错
二、访问
PyHive
连接 Hive
一般流程:
- 创建连接
- 获取游标
- 执行SQL语句
- 获取结果
- 关闭连接
# 加载包
from pyhive import hive
# 建立连接
conn = hive.connect(host = '100.100.100.100', # 主机
port = 10000, # 端口
auth = 'xxx', # 用户
kerberos_service_name = 'hive', # 服务
database = 'xxx' # 数据库
password = 'xxx')
# 查询
cursor = conn.cursor()
cursor.execute('select * from table limit 10')
for result in cursor.fetchall():
print(result)
'''
('1','a','100')
('2','b','200')
('3','c','300')
'''
# 关闭连接
cursor.close()
conn.close()
其中,cursor.fetchall()
返回的是一个 list
对象,并且每一个元素都是一个 tuple
对象。
需要对其进行一定的处理,才能转换为建模需要的 DataFrame
。
三、函数封装
# 函数封装
def get_data(params, sql_text, is_all_col=1):
'''
is_all_col: 是否选取所有的列 select *
'''
# 建立连接
con = hive.connect(host = params.get('ip'),
port = params.get('port'),
auth = params.get('auth'),
kerberos_service_name = params.get('kerberos_service_name'),
database = params.get('database'),
password = params.get('password'))
cursor = con.cursor()
cursor.execute(sql_text)
# 列名
col_tmp = cursor.description
col = list()
if is_all_col == 1:
for i in range(len(col_tmp)):
col.append(col_tmp[i][0].split('.')[1])
else:
for i in range(len(col_tmp)):
col.append(col_tmp[i][0])
# 数据
data = cursor.fetchall()
result = pd.DataFrame(list(data), columns=col)
# 关闭连接 释放资源
cursor.close()
con.close()
return result
if __name__ == '__main__':
import pandas as pd
import numpy as np
params = {
'ip': 'xxx.xxx.xxx.xxx',
'port': '1000',
'auth': 'hider',
'kerberos_service_name': 'hive',
'database': 'hive',
'password': '100',
}
sql_text1 = 'select * from table limit 5'
sql_text2 = 'select a, b, c from table limit 5'
# 所有列
data1 = get_data(params, sql_text1, is_all_col=1)
# 指定列
data2 = get_data(params, sql_text2, is_all_col=0)
四、高级用法
1.Hive配置
Hive
有许多必要的参数设置,通过 Connection
类的 configuration
参数可进行配置。
hive_config = {
'mapreduce.job.queuename': 'my_hive',
'hive.exec.compress.output': 'false',
'hive.exec.compress.intermediate': 'true',
'mapred.min.split.size.per.node': '1',
'mapred.min.split.size.per.rack': '1',
'hive.map.aggr': 'true',
'hive.groupby.skewindata': 'true'
}
conn = hive.connect(host = '',
port = '',
...
configuration = hive_config)
2.列名
Cursor
类中有一个 description
方法,可以获取数据表中的列名、数据类型等信息。
col = cursor.description
col_names = list()
for column in col:
col_names.append(column[0]) # 提取第一个元素:列名
3.执行脚本带参数
游标所执行的脚本可以不写死,通过参数的方式进行配置。
month = 202205
# %占位符
sql_text = 'select * from table where month = %d limit 5' % month
print(sql_text)
# format
sql_text = 'select * from table where month = {} limit 5'.format(month)
# f-string
sql_text = f'select * from table where month = {month} limit 5'
print(sql_text)
参考链接:使用PyHive操作Hive
参考链接:使用PyHive连接Hive数据仓库