一、开发背景
知己知彼,百战不殆。既然要开发Elasticsearch批量写入插件,那我们首先了解下ElasticSearch。
Elasticsearch是一个实时分布式存储、搜索和数据分析引擎。它让你以前所未有的速度处理大数 据成为可能。它用于全文搜索、结构化搜索、分析以及将这三者混合使用。
Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。
Elasticsearch很快,快到不可思议。强大的设计,方便我们通过有限状态转换器实现了用于全文检索的倒排索引,实现了用于存储数值数据和地理位置数据的 BKD 树,以及用于分析的列存储。
Elasticsearch和传统数据库RDBMS比较
序号 | Elasticsearch | RDBMS |
1 | index(索引) | database(数据库) |
2 | mapping(映射) | schema(表结构) |
3 | type(类型)6.X后废弃 | table(表) |
4 | doucment(文档) | row(行) |
5 | field(字段) | column(列) |
6 | 反向索引 | 索引 |
7 | DSL | SQL |
8 | Get Http://…… | select * from tablename |
9 | Put Http://…… | update tablename set 1=1 |
10 | Detele Http://…… | delete from tablename |
二、索引创建
基于Kettle环境平台,构建app-pentaho-es6或app-pentaho-es7插件,实现原理是动态数据流字段,自定动态索引,来实现ElasticSearch Bulk Insert批量写入。所以,我们首先要了解如何基于索引模板(kettle-es)模式,按日期分片创建动态索引(kettle-es_*)实现写入,按别名kettle-es-query来实现索引检索。具体示例如下:
PUT _template/kettle-es{"index_patterns": ["kettle-es_*"],"settings": {"index": {"max_result_window": "100000","number_of_shards": "3","number_of_replicas": "1"}},"aliases": {"kettle-es-query": {}},"mappings": {"properties": {"agent_ip": {"type": "ip"},"record_id": {"type": "integer"},"start_time": {"format": "yyyy-MM-dd HH:mm:ss","type": "date"},"fire_time": {"format": "yyyy-MM-dd HH:mm:ss","type": "date"},"end_time": {"format": "yyyy-MM-dd HH:mm:ss","type": "date"},"pid": {"type": "keyword"},"message": {"index": false,"type": "text"}}}}
三、ElasticSearch Bulk Insert源代码介绍
3.1、源代码目录结构
无论是app-pentaho-es6或app-pentaho-es7,从代码目录结构来看,都是ElasticSearchBulk步骤类、ElasticSearchBulkData数据类、ElasticSearchBulkMeta元数据类和ElasticSearchBulkDialog对话框类、日志消息提醒配置message。具体可查看源代码
3.1.1、目录结构对比


3.1.2、maven配置对比
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><pdi.version>6.1.0.1-SNAPSHOT</pdi.version><build.revision>${project.version}</build.revision><timestamp>${maven.build.timestamp}</timestamp><build.description>${project.description}</build.description><maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format><elasticsearch.version>6.4.2</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>compile</scope></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId><version>${elasticsearch.version}</version><scope>compile</scope></dependency><dependencies>
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><pdi.version>6.1.0.1-SNAPSHOT</pdi.version><build.revision>${project.version}</build.revision><timestamp>${maven.build.timestamp}</timestamp><build.description>${project.description}</build.description><maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format><elasticsearch.version>7.2.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${elasticsearch.version}</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>compile</scope></dependency><dependencies>
3.2、app-pentaho-es6插件
3.2.1、TransportClient API说明
app-pentaho-es6插件,基于TransportClient使用传输模块远程连接到集群。后面,elastic计划在Elasticsearch 7.0中弃用TransportClient,并在8.0中完全删除它。
获取Elasticsearch客户端最常用方法是创建连接到群集的TransportClient。它不加入集群,而只是获取一个或多个初始传输地址,并在每个操作上以循环方式与它们通信。
3.2.2、客户端初始化
(1)确定PreBuiltTransportClient连接es集群名称、地址、端口和协议等信息,设置TransportAddress配置
(2)测试制定Index是否正常连接成功,得到Client
private void initClient() throws UnknownHostException {Settings.Builder settingsBuilder = Settings.builder();settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);Map<String, String> tMetaMap = meta.getSettingsMap();Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();while (iter.hasNext()) {Entry<String, String> entry = (Entry<String, String>) iter.next();settingsBuilder.put(entry.getKey(),environmentSubstitute(entry.getValue()));}PreBuiltTransportClient tClient = new PreBuiltTransportClient(settingsBuilder.build());for (Server server : meta.getServers()) {tClient.addTransportAddress(new TransportAddress(InetAddress.getByName(environmentSubstitute(server.getAddress())),server.getPort()));}client = tClient;/*** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,* which was removed from the elasticsearch 5.0 API, see:* https://www.elastic.co/guide/en/elasticsearch/reference/5.0/* breaking_50_java_api_changes .html#_nodebuilder_removed*/}
3.2.3、数据流处理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)throws KettleException {Object[] rowData = getRow();if (rowData == null) {if (currentRequest != null && currentRequest.numberOfActions() > 0) {// didn't fill a whole batchprocessBatch(false);}setOutputDone();return false;}if (first) {first = false;setupData();currentRequest = client.prepareBulk();requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);initFieldIndexes();}try {data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;return indexRow(data.inputRowMeta, rowData) || !stopOnError;} catch (KettleStepException e) {throw e;} catch (Exception e) {rejectAllRows(e.getLocalizedMessage());String msg = BaseMessages.getString(PKG,"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());logError(msg);throw new KettleStepException(msg, e);}}
3.2.4、数据批次处理
private boolean processBatch(boolean makeNew) throws KettleStepException {ActionFuture<BulkResponse> actionFuture = currentRequest.execute();boolean responseOk = false;BulkResponse response = null;try {if (timeout != null && timeoutUnit != null) {response = actionFuture.actionGet(timeout, timeoutUnit);} else {response = actionFuture.actionGet();}} catch (ElasticsearchException e) {String msg = BaseMessages.getString(PKG,"ElasticSearchBulk.Error.BatchExecuteFail",e.getLocalizedMessage());if (e instanceof ElasticsearchTimeoutException) {msg = BaseMessages.getString(PKG,"ElasticSearchBulk.Error.Timeout");}logError(msg);rejectAllRows(msg);}if (response != null) {responseOk = handleResponse(response);requestsBuffer.clear();} else { // have to assume all failednumberOfErrors += currentRequest.numberOfActions();setErrors(numberOfErrors);}// duration += response.getTookInMillis(); //just in trunk..if (makeNew) {currentRequest = client.prepareBulk();data.nextBufferRowIdx = 0;data.inputRowBuffer = new Object[batchSize][];} else {currentRequest = null;data.inputRowBuffer = null;}return responseOk;}
3.3、app-pentaho-es7插件
3.3.1、RestHighLevelClient API说明
app-pentaho-es7插件,基于Elasticsearch提供的Java高级REST客户端RestHighLevelClient,它执行HTTP请求而不是序列化的Java请求。Java客户端主要用途有:
(1)在现有集群上执行标准索引,获取,删除和搜索操作
(2)在正在运行的集群上执行管理任务
3.3.2、客户端初始化
(1)使用CredentialsProvider初始化Elasticsearch身份认证
(2)确定RestHighLevelClient连接es集群名称、地址、端口和协议等信息,设置setHttpClientConfigCallback回调配置
(3)测试制定Index是否正常连接成功,得到Client
private void initClient() throws UnknownHostException {Settings.Builder settingsBuilder = Settings.builder();settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);meta.getSettingsMap().entrySet().stream().forEach((s) -> settingsBuilder.put(s.getKey(),environmentSubstitute(s.getValue())));RestHighLevelClient rclient = null;final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials(settingsBuilder.get("es.user"),settingsBuilder.get("es.password")));for (Server server : meta.getServers()) {rclient = new RestHighLevelClient(RestClient.builder(new HttpHost(server.getAddress(), Integer.valueOf(server.getPort()), "http")).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);}}));}client = rclient;/*** With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,** which was removed from the elasticsearch 5.0 API, see:** https://www.elastic.co/guide/en/elasticsearch/reference/5.0/* breaking_50_java_api_changes** .html#_nodebuilder_removed*/}
3.3.3、数据流处理
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)throws KettleException {Object[] rowData = getRow();if (rowData == null) {if (currentRequest != null && currentRequest.numberOfActions() > 0) {processBatch(false);}setOutputDone();return false;}if (first) {first = false;setupData();requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);initFieldIndexes();}try {data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;return indexRow(data.inputRowMeta, rowData) || !stopOnError;} catch (KettleStepException e) {throw e;} catch (Exception e) {rejectAllRows(e.getLocalizedMessage());String msg = BaseMessages.getString(PKG,"ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());logError(msg);throw new KettleStepException(msg, e);}}
3.3.4、数据批次处理
private boolean processBatch(boolean makeNew) throws KettleStepException {BulkResponse response = null;// ActionFuture<BulkResponse> actionFuture = currentRequest.execute();try {response = client.bulk(currentRequest, RequestOptions.DEFAULT);} catch (IOException e1) {rejectAllRows(e1.getLocalizedMessage());String msg = BaseMessages.getString(PKG, "ElasticSearchBulk.Log.Exception",e1.getLocalizedMessage());logError(msg);throw new KettleStepException(msg, e1);}boolean responseOk = false;if (response != null) {responseOk = handleResponse(response);requestsBuffer.clear();} else { // have to assume all failednumberOfErrors += currentRequest.numberOfActions();setErrors(numberOfErrors);}if (makeNew) {// currentRequest = client.prepareBulk();try {client.bulk(currentRequest, RequestOptions.DEFAULT);} catch (IOException e1) {rejectAllRows(e1.getLocalizedMessage());String msg = BaseMessages.getString(PKG,"ElasticSearchBulk.Log.Exception",e1.getLocalizedMessage());logError(msg);throw new KettleStepException(msg, e1);}data.nextBufferRowIdx = 0;data.inputRowBuffer = new Object[batchSize][];} else {currentRequest = null;data.inputRowBuffer = null;}return responseOk;}
无论服务端是那个版本的Elasticsearch集群,客户端必须具有与服务端群集中的节点相同的主要版本(例如6.x或7.x)
四、ElasticSearch Bulk Insert使用说明
4.1、General参数
①Index:动态索引字段,索引前缀+动态日期②Type:默认_doc③Test Index:在线检查索引是否存在④Batch Size:批次大小⑤Stop on error:遇到错误是否终止⑥Batch Timeout:批次写入超时时间,单位秒⑦Id Field:即文档ID,doc_id⑧Overwrite if exists:存在是否覆盖⑨Output Rows:输出行

4.2、Servers参数
①Address:Elasticsearch集群地址列表②Port:匹配端口号

4.3、Fields输出字段
①Name:数据流字段②Target Name:Elasticsearch集群对应index,目标mapping字段

4.4、Settings参数
①cluster.name:集群名称②es.user:es鉴权认证用户名,自定义参数名③es.password:es鉴权认证密码,自定义参数名

五、总结
如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编公众号"游走在数据之间",回复2查看源代码,回复3获取入门视频。加入Kettle交流群"话说Kettle"。




