1、avro source
Avro Source可以定制avro-client发送一个指定的文件给Flume agent,Avro源使用Avro RPC机制,Flume主要的RPC Source也是 Avro Source,它使用Netty-Avro inter-process的通信(IPC)协议来通信,因此可以用java或JVM语言发送数据到Avro Source端。它的配置文件主要包含三个参数:
type: Avro source的别名是avro,也可以使用完整类别名称,org.apache.flume.source.AvroSource;
bind: 绑定的IP地址或主机名。使用0.0.0.0绑定机器所有端口
port: 绑定监听端口端口
1、通过avro-client发送文件数据
flume-ng自带avro-client命令,可以将指定的文件,通过avro rpc发送到指定的avro源。
当输入flume-ng help命令后,就可以查看到这个avro-client命令:
[isoft@server101 flume-1.9.0]$ bin/flume-ng help
Usage: bin/flume-ng <command> [options]...
commands:
help display this help text
agent run a Flume agent
avro-client run an avro Flume client
version show Flume version info
avro source配置示例:
#配置三个源
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置r1的输入源为avro
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
#配置channels的类型为memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#配置sinks的目标为logger
a1.sinks.k1.type=logger
#组织三个组件
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
现在启动avro source:
$ flume-ng agent -n a1 -c conf/ --conf-file conf/01_avro.conf \
-Dflume.root.logger=INFO,consol
然后使用flume-ng avro-client向4141端口发送文件数据:
$flume-ng avro-client -c conf/ -H 127.0.0.1 -p 4141 -F hello.txt
现在就可以查看日志log/flume.log中的内容:

2、Java api向avroi源发送数据
同样,也可以使用Java代码,向avro源发送数据。可以通过官方地址获取参考代码。官方地址:
http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
使用上面的配置文件,并启动agent。
首先,需要添加依赖:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
开发一个基本的测试:
@Test
public void sendData() throws Exception {
String host = "server101";
int port = 4141;
RpcClient client = RpcClientFactory.getDefaultInstance(host,port);
//发送数据
Event event = EventBuilder.withBody("Hello", Charset.forName("UTF-8"));
client.append(event);
client.close();//关闭
}
上面的代码中,通过client.append即会将Event发送到avro source源上去。
连续发送event示例:
/** * 测试接收彷的输入连续输入 */
public static void main(String[] args) throws Exception {
String host = "server101";
int port = 4141;
RpcClient client = RpcClientFactory.getDefaultInstance(host,port);
//连续输入数据
Scanner sc = new Scanner(System.in);
while(true){
String str = sc.nextLine();
if(str.equals("exit")){
break;
}
Event event = EventBuilder.withBody(str, Charset.forName("UTF-8"));
client.append(event);
}
client.close();
}
启动Java程序并连续入数据:

查看flume agent接收的数据在log中的输出:

源代码说明:
在flume 1.4以后,连接avro source或是连接thirft source都可以使得RpcClient。RpcClient的两个主要子类为:
ThriftRpcClient和nettyAvroRpcClient。如下图所示。

上面的代码,由于连接是Avro source源,所以返回的对象为:NettyAvroRpcClient。即,代码:RpcClient client = RpcClientFactory.getDefaultInstance(host,port);
返回的是:NettyAvroRpcClient。
3、开发一个netcat源向avro发送数据
以下开发一个示例配置,将配置两个agent,两个agent中间,通过avro rpc进行通讯。配置图示如下:

修改avro.conf内容如下:
#配置agent a0
a0.sources=r0
a0.channels=c0
a0.sinks=k0
a0.sources.r0.type=netcat
a0.sources.r0.bind=0.0.0.0
a0.sources.r0.port=4040
a0.channels.c0.type=memory
a0.channels.c0.capacity=1000
a0.channels.c0.transactionCapacity=100
#配置sink为avro并向一个固定的ip和端口号4141
a0.sinks.k0.type=avro
a0.sinks.k0.hostname=127.0.0.1
a0.sinks.k0.port=4141
a0.sources.r0.channels=c0
a0.sinks.k0.channel=c0
############################
#配置agent a1,内容同上直接copy
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#配置source为avro,监听本机的4141端口
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
注意上面的4141,就是指上面的avro接收数据的端口。
现在启动agent a1:
$ bin/flume-ng agent-na1-cconf/--conf-fileconf/avro.conf \
-Dflume.root.logger=INFO,console
然后再启动agent a0:
$bin/flume-ng agent -na0-cconf/--conf-fileconf/avro.conf \
-Dflume.root.logger=INFO,console
现在就可以使用telnet登录向这个netcat发送数据了,然后这个数据会通过avro rpc发送到avro的源,最后输出到日志文件中。
以下是向agent a0发送数据:
[isoft@server101 ~]$ telnet localhost 4040
Trying ::1...
Connected to localhost.
Escape character is '^]'.
Jack
OK
以下是agent a1输出到日志的数据:
{ headers:{} body: 4A 61 63 6B 0D Jack. }
{ headers:{} body: 4A 61 63 6B 0D Jack. }
{ headers:{} body: 4A 61 63 6B 0D Jack. }
到此两个agent 通过avro rpc通讯的示例,完成。




