PyHivePython 语言编写的用于操作 Hive 的简便工具库。

一、PyHive安装

# Liunx系统
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

# Windows系统会出现莫名其妙的报错

二、访问

PyHive 连接 Hive 一般流程:

  • 创建连接
  • 获取游标
  • 执行SQL语句
  • 获取结果
  • 关闭连接
# 加载包
from pyhive import hive

# 建立连接
conn = hive.connect(host = '100.100.100.100',      # 主机
                    port = 10000,                  # 端口 
                    auth = 'xxx',                  # 用户
                    kerberos_service_name = 'hive', # 服务
                    database = 'xxx'            # 数据库
                    password = 'xxx')
                    
# 查询
cursor = conn.cursor()
cursor.execute('select * from table limit 10')
for result in cursor.fetchall():
    print(result)
'''
('1','a','100')
('2','b','200')
('3','c','300')
'''
    
# 关闭连接
cursor.close()
conn.close()

其中,cursor.fetchall() 返回的是一个 list 对象,并且每一个元素都是一个 tuple 对象。

需要对其进行一定的处理,才能转换为建模需要的 DataFrame

三、函数封装

# 函数封装
def get_data(params, sql_text, is_all_col=1):
    '''
    is_all_col: 是否选取所有的列 select *
    '''
    # 建立连接
    con = hive.connect(host = params.get('ip'),
                       port = params.get('port'),
                       auth = params.get('auth'),
                       kerberos_service_name = params.get('kerberos_service_name'),
                       database = params.get('database'),
                       password = params.get('password'))
    cursor = con.cursor()
    cursor.execute(sql_text)
    # 列名
    col_tmp = cursor.description
    col = list()
    if is_all_col == 1:
        for i in range(len(col_tmp)):
            col.append(col_tmp[i][0].split('.')[1])
    else:
        for i in range(len(col_tmp)):
            col.append(col_tmp[i][0])
    # 数据
    data = cursor.fetchall()
    result = pd.DataFrame(list(data), columns=col)
    # 关闭连接 释放资源
    cursor.close()
    con.close()
    
    return result

if __name__ == '__main__':

    import pandas as pd
    import numpy as np
    
    params = {
        'ip': 'xxx.xxx.xxx.xxx',
        'port': '1000',
        'auth': 'hider',
        'kerberos_service_name': 'hive',
        'database': 'hive',
        'password': '100',
    }
    
    sql_text1 = 'select * from table limit 5'
    sql_text2 = 'select a, b, c from table limit 5'
    
    # 所有列
    data1 = get_data(params, sql_text1, is_all_col=1)
    # 指定列
    data2 = get_data(params, sql_text2, is_all_col=0)
    

四、高级用法

1.Hive配置

Hive 有许多必要的参数设置,通过 Connection 类的 configuration 参数可进行配置。

hive_config = {
    'mapreduce.job.queuename': 'my_hive',
    'hive.exec.compress.output': 'false',
    'hive.exec.compress.intermediate': 'true',
    'mapred.min.split.size.per.node': '1',
    'mapred.min.split.size.per.rack': '1',
    'hive.map.aggr': 'true',
    'hive.groupby.skewindata': 'true'
}

conn = hive.connect(host = '',
                    port = '',
                    ...
                    configuration = hive_config)

2.列名

Cursor 类中有一个 description 方法,可以获取数据表中的列名、数据类型等信息。

col = cursor.description
col_names = list()
for column in col:
	col_names.append(column[0]) # 提取第一个元素:列名

3.执行脚本带参数

游标所执行的脚本可以不写死,通过参数的方式进行配置。

month = 202205

# %占位符
sql_text = 'select * from table where month = %d limit 5' % month
print(sql_text)

# format
sql_text = 'select * from table where month = {} limit 5'.format(month)

# f-string
sql_text = f'select * from table where month = {month} limit 5'
print(sql_text)

参考链接:使用PyHive操作Hive

参考链接:使用PyHive连接Hive数据仓库

参考链接:PYTHON访问hive数据,并返回DataFrame代码封装

参考链接:python连接hive_python3.6如何连接hive?

版权声明:本文为Hider1214原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/hider/p/16279747.html