暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Python访问ClickHouse的方法汇总

alitrack 2022-03-04
4625

ClickHouse 服务器先白嫖网上的好了(免安装 Clickhouse 体验单表 40 亿行大数据的方法)

clickhouse-sqlalchemy

Superset 连接 ClickHouse 这篇文章里已经介绍了这个包。

安装 clickhouse-sqlalchemy

pip install clickhouse-sqlalchemy

接口支持

  • native 推荐通过 clickhouse-driver[1]
  • http 通过请求

连接参数

ClickHouse SQLAlchemy 连接字符串使用以下语法:

'clickhouse+<driver>://<username>:<password>@<host>:<port>/<database>[?key=value..]'

  • driver , 使用的驱动程序。可能的选择:http
    native
    。默认 http

  • database, 连接的数据库。默认为 default

  • port, 端口

  • 选用 HTTP 接口时,

    • port 是 ClickHouse 服务器绑定的端口。默认为 8123
    • 以秒为单位超时 。默认情况下没有超时。
    • 协议 可选择:http
      https
      。默认 http

默认 ClickHouse 安装中到数据库测试的连接字符串:

'clickhouse://default:@localhost/test'

当您使用 nginx 作为 ClickHouse 的代理服务器时,服务器连接字符串可能如下所示:

'clickhouse://user:password@example.com:8124/test?protocol=https'

8124
代理端口。

protocol=https
表示使用 https 协议

使用示例,

  • native 接口
import pandas as pd
import sqlalchemy as sql
url = "clickhouse+native://play:@gh-api.clickhouse.tech/default?secure=true"
engine = sql.create_engine(url)

df=pd.read_sql("select * from dns limit 3 ",engine)
df


#自动做了数据类型映射
df.dtypes
timestamp    datetime64[ns]
domain               object
a                    object
aaaa                 object
cname                object
dtype: object

  • http 接口
from sqlalchemy import create_engine
from requests import Session
import pandas as pd

uri = "clickhouse://default:qwert@localhost:8123/default"
engine = create_engine(uri)
df=pd.read_sql("show databases ",engine)
df


requests

如果因为某些原因不能使用 clickhouse-sqlalchemy,可以使用 requests,因为 clickhouse 提供了 http 接口。

定义获取数据的函数

import requests
import pandas as pd
def get_data(query, host, user_name='', user_passwd=''):
    r = requests.post(host, data=f"{query} FORMAT JSONCompact",
                      auth=(user_name, user_passwd), verify=SSL_VERIFY)
    if r.status_code==200:
        return r.json()
    else:
        return None

ClickHouse 可以接受和返回各种格式[2]的数据。受支持的输入格式可用于提交给 INSERT
语句、从文件表(File,URL,HDFS 或者外部目录)执行 SELECT
语句,受支持的输出格式可用于格式化 SELECT
语句的返回结果,或者通过 INSERT
写入到文件表。

读取数据

query ="select * from dns limit 3"
host="https://@gh-api.clickhouse.tech/default"
user_name="play"
user_passwd=""
SSL_VERIFY=True
data =get_data(query, host, user_name, user_passwd)

从 data 的输出可以看到,在这里使用 FORMAT JSONCompact
是最方便的,

{'meta': [{'name': 'timestamp', 'type': 'DateTime'},
  {'name': 'domain', 'type': 'String'},
  {'name': 'a', 'type': 'IPv4'},
  {'name': 'aaaa', 'type': 'IPv6'},
  {'name': 'cname', 'type': 'String'}],
 'data': [['2021-07-30 10:55:06',
   '&nbsp;nhl172538.w197-e1.ezcname.com',
   '59.188.181.151',
   '::',
   ''],
  ['2021-07-30 10:23:02',
   "'intranet-tcc121.msappproxy.net",
   '0.0.0.0',
   '::',
   'proxy-appproxy-cus-dsm02p-3.msappproxy.net'],
  ['2021-07-30 04:00:30',
   '*.10milstartups.com.s.strikinglydns.com',
   '100.25.195.121',
   '::',
   '']],
 'rows': 3,
 'rows_before_limit_at_least': 3,
 'statistics': {'elapsed': 0.004231434, 'rows_read': 6, 'bytes_read': 484}}

  • meta 部分为字段名和数据类型

  • data 部分为数据(二维数组)

根据返回结果生成 Python DataFrame

df=pd.DataFrame(data['data'],columns=[x['name'for x in data['meta']])
df.dtypes

timestamp    object
domain       object
a            object
aaaa         object
cname        object
dtype: object

根据 data['meta']的数据类型信息,对 DataFrame 做进一步处理

df.timestamp = pd.to_datetime(df.timestamp)

timestamp
应该为日期类型,

a
aaaa
分别为 ipv4 和 ipv6,DataFrame 不支持这些数据类型,处理成字符串类型

如果想想实现自动映射,需要建立一个映射表(参考 clickhouse-sqlalchemy[3])。

参考资料


[1]

clickhouse-driver: https://github.com/mymarilyn/clickhouse-driver

[2]

接受和返回各种格式: https://clickhouse.com/docs/zh/interfaces/formats/

[3]

clickhouse-sqlalchemy: https://github.com/xzkostyan/clickhouse-sqlalchemy/tree/master/clickhouse_sqlalchemy/types


文章转载自alitrack,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论