暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flume:avro source

Coding On Road 2019-10-10
1360

1、avro source

     Avro Source可以定制avro-client发送一个指定的文件给Flume agentAvro源使用Avro RPC机制,Flume主要的RPC Source也是 Avro Source,它使用Netty-Avro inter-process的通信(IPC)协议来通信,因此可以用javaJVM语言发送数据到Avro Source端。它的配置文件主要包含三个参数:

type  Avro source的别名是avro,也可以使用完整类别名称,org.apache.flume.source.AvroSource

bind  绑定的IP地址或主机名。使用0.0.0.0绑定机器所有端口

port  绑定监听端口端口

 

1、通过avro-client发送文件数据

flume-ng自带avro-client命令,可以将指定的文件,通过avro rpc发送到指定的avro源。

当输入flume-ng help命令后,就可以查看到这个avro-client命令:

[isoft@server101 flume-1.9.0]$ bin/flume-ng help

Usage: bin/flume-ng <command> [options]...

commands:

  help                      display this help text

  agent                     run a Flume agent

  avro-client               run an avro Flume client

  version                   show Flume version info

 

avro source配置示例:

#配置三个源

a1.sources=r1

a1.channels=c1

a1.sinks=k1

#配置r1的输入源为avro

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=4141

#配置channels的类型为memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

#配置sinks的目标为logger

a1.sinks.k1.type=logger

#组织三个组件

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

现在启动avro source

$ flume-ng agent -n a1 -c conf/ --conf-file conf/01_avro.conf \

-Dflume.root.logger=INFO,consol

 

然后使用flume-ng avro-client4141端口发送文件数据:

$flume-ng avro-client -c conf/ -H 127.0.0.1 -p 4141 -F hello.txt

 

现在就可以查看日志log/flume.log中的内容:

 


 

2、Java api向avroi源发送数据

同样,也可以使用Java代码,向avro源发送数据。可以通过官方地址获取参考代码。官方地址:

http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

 使用上面的配置文件,并启动agent

首先,需要添加依赖:

<dependency>

    <groupId>org.apache.flume</groupId>

    <artifactId>flume-ng-core</artifactId>

    <version>1.9.0</version>

</dependency>

开发一个基本的测试:

@Test

public void sendData() throws Exception {

    String host = "server101";

    int port = 4141;

    RpcClient client = RpcClientFactory.getDefaultInstance(host,port);

    //发送数据    

Event event =  EventBuilder.withBody("Hello", Charset.forName("UTF-8"));

    client.append(event);

    client.close();//关闭

}

上面的代码中,通过client.append即会将Event发送到avro source源上去。

连续发送event示例:

/** * 测试接收彷的输入连续输入 */

public static void main(String[] args) throws Exception {

    String host = "server101";

    int port = 4141;

    RpcClient client = RpcClientFactory.getDefaultInstance(host,port);

    //连续输入数据   

 Scanner sc = new Scanner(System.in);

    while(true){

        String str = sc.nextLine();

        if(str.equals("exit")){

            break;

        }

        Event event =  EventBuilder.withBody(str, Charset.forName("UTF-8"));

        client.append(event);

    }

    client.close();

}

启动Java程序并连续入数据:

  


查看flume agent接收的数据在log中的输出:

 

 

源代码说明:

flume 1.4以后,连接avro source或是连接thirft source都可以使得RpcClientRpcClient的两个主要子类为:

ThriftRpcClientnettyAvroRpcClient。如下图所示。

 


 上面的代码,由于连接是Avro source源,所以返回的对象为:NettyAvroRpcClient。即,代码:RpcClient client = RpcClientFactory.getDefaultInstance(host,port);

返回的是:NettyAvroRpcClient

 

3、开发一个netcat源向avro发送数据

以下开发一个示例配置,将配置两个agent,两个agent中间,通过avro rpc进行通讯。配置图示如下:

 

 

修改avro.conf内容如下:

#配置agent a0

a0.sources=r0

a0.channels=c0

a0.sinks=k0

 

a0.sources.r0.type=netcat

a0.sources.r0.bind=0.0.0.0

a0.sources.r0.port=4040

 

a0.channels.c0.type=memory

a0.channels.c0.capacity=1000

a0.channels.c0.transactionCapacity=100

#配置sinkavro并向一个固定的ip和端口号4141

a0.sinks.k0.type=avro

a0.sinks.k0.hostname=127.0.0.1

a0.sinks.k0.port=4141

 

a0.sources.r0.channels=c0

a0.sinks.k0.channel=c0

############################

#配置agent a1,内容同上直接copy

a1.sources=r1

a1.channels=c1

a1.sinks=k1

#配置sourceavro,监听本机的4141端口

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=4141

 

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

 

a1.sinks.k1.type=logger

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

注意上面的4141,就是指上面的avro接收数据的端口。

现在启动agent a1

$ bin/flume-ng  agent-na1-cconf/--conf-fileconf/avro.conf \

-Dflume.root.logger=INFO,console   

然后再启动agent a0

$bin/flume-ng  agent  -na0-cconf/--conf-fileconf/avro.conf \

-Dflume.root.logger=INFO,console

 

现在就可以使用telnet登录向这个netcat发送数据了,然后这个数据会通过avro rpc发送到avro的源,最后输出到日志文件中。

以下是向agent a0发送数据:

[isoft@server101 ~]$ telnet localhost 4040

Trying ::1...

Connected to localhost.

Escape character is '^]'.

Jack

OK

以下是agent a1输出到日志的数据:

{ headers:{} body: 4A 61 63 6B 0D                                  Jack. }

{ headers:{} body: 4A 61 63 6B 0D                                  Jack. }

{ headers:{} body: 4A 61 63 6B 0D                                  Jack. }

到此两个agent 通过avro rpc通讯的示例,完成。

 


文章转载自Coding On Road,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论