暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

分析型系统如何支持开放格式

手机用户2895 2023-08-25
240

### 综述

分析型系统百花齐放,数据在不同系统之间流转是非常常见的场景,开放格式的出现就是为了降低数据在不同系统间流转的代价。数据以开放格式存放在可以共享的存储设备上,一方面可以绕过执行层直接通过Load的方式来加载数据降低数据流转的代价,另外可以直接对数据进行查询分析消除加载数据的流程。因此数据格式相当于是定义了一套不同数据分析系统间的**数据交互协议。**


既然开放格式可以这么有效的降低数据流转的协议,为什么Clickhouse和Snowflake都主要使用自身私有的数据格式呢?最主要的考虑是**性能,**私有的数据格式可以更方便实现内部特殊的优化逻辑。因此Clickhouse和Snowflake都仅仅将开放格式作为与外界进行数据交互的通道。


Clickhouse几乎支持市面上所有开放格式的导入和导出, Snowflake仅支持少量几种数据格式的导入并且支持导出的数据格式更少。这跟两个产品的分发方式有很大的关系,Clickhouse作为开源的数据分析系统,更强的与生态的结合能力对于他的**流行**有很大帮助,而Snowflake作为闭源的数据分析系统,如何将数据留在内部对于他自身的**商业化**更为重要。


Clickhouse和Snowflake对于开放格式数据的支持,基本都停留在**静态数据**的层面,不支持更新、删除、甚至追加写。这一点跟数据湖系统还不太一样,目前比较流行的几个数据湖系统Delta Lake、Iceberg、Hudi都通过delta架构来实现了更新、删除、追加写。

###

### Snowflake

Snowflake内部的表私有格式进行存储,同时支持若干开放格式实现静态数据的upload和unload。Snowflake只只支持私有格式的更新,对于开放格式只支持读。


Snowflake目前支持的开放格式:


| 格式 | 支持导入 | 支持导出 |

| --- | --- | --- |

| CSV | Y | Y |

| JSON | Y | N |

| AVRO | Y | N |

| ORC | Y | N |

| PARQUET | Y | Y |

| XML | Y | N |


#### Load&UnLoad

Snowflake通过一组` Data Loading/Unloading DDL`和 `COPY INTO`实现导入外部数据、导出内部数据,主要包括以下四方面:


- Stage Management. Stage是SnowFlake内部的表与外界进行交互的暂存工作区。导入外部数据时,首先需要通过将外部数据文件的信息保存到stage中;导出内部数据时,首先需要通过`COPY INTO <location>`生成对应的数据文件。

- File Format Management:File Format用于定义数据交互时所使用的格式(Parquet、ORC、Avro...)以及特定格式的具体信息(压缩、分隔符...)。

- COPY INTO:`COPY INTO`实现Snowflake内部的表与stage中数据文件之间的数据交互。`COPY INTO <table>`实现将stage中的数据文件导入到Snowflake内部的表;COPY INTO <location>实现将Snowflake内部的表导出到stage中。具体的格式可以通过FILE_FORMAT参数指定为上面CREATE FILE FORMAT所创建的对象。

- Pipe Management: Pipe是对COPY INTO <table>的封装,供Snowpipe使用实现将外部数据导入到Snowflake内部。


Stage分为Internal stage、External stage两类。Internal stage用于操作本地数据文件,External stage用于操作云存储上的数据文件,详细参见[Stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html#label-create-stage-externalstageparams)。使用Internal stage的时候需要通过`PUT/GET`将本地文件upload/unload到stage中,使用External stage的时候不需要额外的`PUT/GET`。另外跟云存储进行数据交互时,可以不经过stage而是在`COPY INTO`中直接指定云存储的相关访问信息。


##### Bulk Loading from a Local File System

![bulk load from local files.jpg](https://intranetproxy.alipay.com/skylark/lark/0/2022/jpeg/1954/1646211667759-872d952b-4aff-463d-8479-fedd7b1874ed.jpeg#clientId=ucaea60f4-dcba-4&from=ui&id=uda42c670&originHeight=1184&originWidth=1852&originalType=binary&ratio=1&rotation=0&showTitle=false&size=175700&status=done&style=none&taskId=u184f0cac-6f81-4731-85a2-1be21031ff5&title=)

##### Bulk Loading from Amazon S3

![bulk load data from s3.jpg](https://intranetproxy.alipay.com/skylark/lark/0/2022/jpeg/1954/1646211548973-e811596f-52f4-4fb5-afdc-9b73eb84eadb.jpeg#clientId=ucaea60f4-dcba-4&from=ui&id=udfcd849e&originHeight=1065&originWidth=1088&originalType=binary&ratio=1&rotation=0&showTitle=false&size=149597&status=done&style=none&taskId=u13f9d31e-ae4b-4dbb-b1bf-dec587fbd54&title=)


通过External stage导入。

```

copy into mytable

from @my_ext_stage

pattern='.*sales.*.csv';

```


不经过External stage直接导入

```

copy into mytable

from s3://mybucket/data/files credentials=(aws_key_id='$AWS_ACCESS_KEY_ID' aws_secret_key='$AWS_SECRET_ACCESS_KEY')

file_format = (format_name = my_csv_format);

```

##### Unloading into a Local File System

![unloading to local file system.jpg](https://intranetproxy.alipay.com/skylark/lark/0/2022/jpeg/1954/1646211866134-c6cdb986-dbf4-4694-973e-c9a44865c621.jpeg#clientId=ucaea60f4-dcba-4&from=ui&id=u8a1553b1&originHeight=901&originWidth=1398&originalType=binary&ratio=1&rotation=0&showTitle=false&size=117366&status=done&style=none&taskId=u9d82a6a3-8782-4cda-9c46-68ada0971d1&title=)


##### Unloading into Amazon S3

![unloading to amazon s3.jpg](https://intranetproxy.alipay.com/skylark/lark/0/2022/jpeg/1954/1646211962699-1d20750e-8062-4d09-bc33-0300dcaddde4.jpeg#clientId=ucaea60f4-dcba-4&from=ui&id=u4dfd0ad5&originHeight=841&originWidth=833&originalType=binary&ratio=1&rotation=0&showTitle=false&size=98164&status=done&style=none&taskId=u228b15f1-9dd1-4b29-a309-22bc6d942ce&title=)


导出到External stage。

```

create or replace stage my_ext_unload_stage url='s3://unload/files/'

storage_integration = s3_int

file_format = my_csv_unload_format;

copy into @my_ext_unload_stage/d1 from mytable;

```


直接导出到S3。

```

copy into s3://mybucket/unload/ from mytable storage_integration = s3_int;

```

#### Select on staged files

Snowflake直接查询staged file,这样对于静态的数据分析可以消除导入到内部的表这个过程的代价。与uploading/unloading不同的地方在于,对于云存储上的数据文件必须先建立对应的stage才能进行查询。


```

-- Create a file format.

create or replace file format myformat type = 'csv' field_delimiter = '|';


-- Create an internal stage.

create or replace stage mystage1;


-- Stage the data files.

put file:///tmp/data*.csv @mystage1;


-- Query the filename and row number metadata columns and the regular data columns in the staged file.

-- Optionally apply pattern matching to the set of files in the stage and optional path.

-- Note that the table alias is provided to make the statement easier to read and is not required.

select t.$1, t.$2 from @mystage1 (file_format => 'myformat', pattern=>'.*data.*[.]csv.gz') t;


+----+----+

| $1 | $2 |

|----+----|

| a | b |

| c | d |

| e | f |

| g | h |

+----+----+


select t.$1, t.$2 from @mystage1 t;


+-----+------+

| $1 | $2 |

|-----+------|

| a|b | NULL |

| c|d | NULL |

| e|f | NULL |

| g|h | NULL |

+-----+------+

```


### 命令

#### PUT&GET

**PUT:** Uploads data files from a local directory/folder on a client machine to one of the following Snowflake stages:


- Names internal stage.

- Internal stage for a specified table.

- Internal stage for a current user.


Once files are staged, the data in the files can be loaded into a table using the COPY INTO <table> command.


**GET: **Downloads data files from one of the following Snowflake stages to a local directory/folder on a client machine:


- Named internal stage.

- Internal stage for a specified table.

- Internal stage for the current user.


Typically, this command is executed after using the COPY INTO <location> command to unload data from table into a Snowflake stage.


#### LIST&REMOVE

**LIST: **Returns a list of files that have been staged in one of the following Snowflake stages:


- Named internal stage.

- Named external stage.

- Stage for a specified table.

- Stage for the current user


REMOVE: Removes files from either an external (external cloud storage) or internal (i.e. Snowflake) stage.

For internal stages, the following stage types are supported:


- Named internal stage

- Stage for a specified table

- Stage for the current user.


#### COPY INTO

**COPY INTO <table>:** Loads data from stages files to an existing table. The files must already be staged one of the following locations:


- Named internal stage(or table/user stage). Files can be stages using the PUT command.

- Named external stage that references an external location(Amazon S3, Google Cloud Storage, or Microsoft Azure).

- External location (Amazon S3, Google Cloud Storage, or Microsoft Azure).


COPY INTO <location>: Unloads data from a table (or query) into one or more files in one of the following locations:


- Named internal stage (or table/user stage). The files can then be downloaded from the stage/location using the GET command.

- Named external stage that references an external location (Amazon S3, Google Cloud Storage, or Microsoft Azure).

- External location (Amazon S3, Google Cloud Storage, or Microsoft Azure).


## Clickhouse

ClickHouse对于开放格式的支持比Snowflake更加的全面,包括支持更多类型的开发格式、丰富的数据格式转换方式。Clickhouse支持的数据类型太多,不一一列举了,详细参见[Format](https://clickhouse.com/docs/en/interfaces/formats/#formats)。Clickhouse通过三种方式支持S3:

包括Table function, Table engine,和作为MergeTree的后端存储, 其中Table function, Table engine支持读和全量的写,但不支持增量更新。在使用Table function, Table engine时对应S3的文件可以是常见的开放格式如ORC,Parquet等,内部通过引入了 Apache Arrow实现对各类开放格式的序列化/反序列化。


### Table engine

将S3作为CK的外表,并进行写入和读取。

```

CREATE TABLE s3_engine_table (name String, value UInt32)

ENGINE=S3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'gzip')

SETTINGS input_format_with_names_use_header = 0;


INSERT INTO s3_engine_table VALUES ('one', 1), ('two', 2), ('three', 3);


SELECT * FROM s3_engine_table LIMIT 2;

```


通过表函数s3直接读取S3上的数据文件。

```

SELECT *

FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')

LIMIT 2;

```


### Table function


通过表函数s3直接读取本地文件上的数据文件

```

s3(path, [aws_access_key_id, aws_secret_access_key,] format, structure, [compression])


SELECT *

FROM s3('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/data.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 UInt32')

LIMIT 2;

```


# S3 Backed MergeTree

将s3配置成一个虚拟的disk对接merge tree

```cpp

<clickhouse>

<storage_configuration>

...

<disks>

<s3>

<type>s3</type>

<endpoint>https://sample-bucket.s3.us-east-2.amazonaws.com/tables/</endpoint>

<access_key_id>your_access_key_id</access_key_id>

<secret_access_key>your_secret_access_key</secret_access_key>

<region></region>

<metadata_path>/var/lib/clickhouse/disks/s3/</metadata_path>

<cache_enabled>true</cache_enabled>

<data_cache_enabled>true</data_cache_enabled>

<cache_path>/var/lib/clickhouse/disks/s3/cache/</cache_path>

</s3>

</disks>

...

</storage_configuration>

</clickhouse>

```


### Select ... Format

Clickhouse支持在查询的时候指定最终的输出格式,结合查询结果的重定向,可以实现灵活的数据格式转换。

```cpp

SELECT * FROM MERGE_TREE_TABLE FORMAT ORC;

```

## Refrence


1. [https://community.snowflake.com/s/article/Working-with-ORC-Data-in-Snowflake](https://community.snowflake.com/s/article/Working-with-ORC-Data-in-Snowflake)

2. Snowflake stage: [https://dwgeek.com/type-of-snowflake-stages-how-to-create-and-use.html/](https://dwgeek.com/type-of-snowflake-stages-how-to-create-and-use.html/)

3. Snowflake Internal Stage:[https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html)

4. Snowflake PUT,upload data files from a local directory/folder on a client machine to Snowflake stages: Named internal stage, Internal stage for a specified table, Internal stage for the current user:[https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html)

5. Snowflake GET, downloads data files from one of the following Snowflake stages(like PUT) to a local directory/folder on a client machine

6. Clickhouse File engine:[https://clickhouse.com/docs/en/engines/table-engines/special/file/](https://clickhouse.com/docs/en/engines/table-engines/special/file/)

7. Clickhouse S3 engine: [https://clickhouse.com/docs/en/engines/table-engines/integrations/s3/](https://clickhouse.com/docs/en/engines/table-engines/integrations/s3/)

8. [https://aws.amazon.com/cn/blogs/china/explore-three-ways-to-combine-clickhouse-and-amazon-s3/](https://aws.amazon.com/cn/blogs/china/explore-three-ways-to-combine-clickhouse-and-amazon-s3/)



「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论