背景
接着上文后续,本文主要介绍使用flume采集sftp服务器指定目录数据到kafka集群。原生flume还是不支持sftp-source,老规矩,还是使用第三方插件,地址: https://github.com/keedio/flume-ftp-source

如图所示:需要使用额外的依赖jar:commons-net-3.3.jar、jsch-0.1.54.jar。
sftp服务器搭建
linux系统默认就是一个sftp服务器,可以不做任何配置,仅仅增加一个用户甚至直接使用root用户就行。不过,安全性差,仅用于测试。
如果生产环境考虑安全性,需要配置/etc/ssh/sshd_config,重启sshd服务。详见文章: https://www.cnblogs.com/felixzh/p/15384466.html
flume-ftp-source插件
下载源码编译对应flume1.9.0的插件jar
git clone https://github.com/keedio/flume-ftp-source.gitmvn clean package –DskipTests –Dflume.version=1.9.0
将打包结果jar和上述依赖jar拷贝到flume/lib文件夹

废话少说,直接上配置文件
[root@felixzh apache-flume-1.9.0-bin]# catsftp-flume-kafka.confagent.sources = sftp-sourceagent.sinks = k1agent.channels = chagent.sources.sftp-source.type =org.keedio.flume.source.ftp.source.Sourceagent.sources.sftp-source.client.source =sftpagent.sources.sftp-source.name.server =felixzh1agent.sources.sftp-source.user = rootagent.sources.sftp-source.password =felixzhagent.sources.sftp-source.port = 22#sftp服务器需要采集的目录agent.sources.sftp-source.working.directory= home/bigdata/testData#flume节点状态数据目录agent.sources.sftp-source.folder = tmp/agent.sources.sftp-source.file.name =ftp2-status-file.seragent.sources.sftp-source.run.discover.delay=5000agent.sources.sftp-source.flushlines = trueagent.sources.sftp-source.search.recursive= trueagent.sources.sftp-source.processInUse =falseagent.sources.sftp-source.processInUseTimeout= 30agent.sources.sftp-source.strictHostKeyChecking= noagent.sources.sftp-source.knownHosts =~/.ssh/known_hostsagent.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSinkagent.sinks.k1.topic = sftp-flumeagent.sinks.k1.brokerList = felixzh1:9092agent.sinks.k1.batchsize = 200agent.sinks.kafkaSink.requiredAcks=1agent.sinks.k1.serializer.class =kafka.serializer.StringEncoderagent.sinks.kafkaSink.zookeeperConnect=felixzh1:2181agent.channels.ch.type = memoryagent.channels.ch.capacity = 10000agent.channels.ch.transactionCapacity =10000agent.channels.hbaseC.keep-alive = 20agent.sources.sftp-source.channels = chagent.sinks.k1.channel = ch
配置参数列表

m : stands for parameter is mandatory for above sourceo : optionalx : not available
创建kafka测试topic
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper felixzh1:2181 --topic sftp-flume --create--partitions 1 --replication-factor 1Created topic sftp-flume.
Sftp服务器准备测试数据

启动flume任务
[root@felixzh apache-flume-1.9.0-bin]#./bin/flume-ng agent -n agent -c conf/ -f sftp-flume-kafka.conf-Dflume.root.logger=INFO,console

消费kafka测试topic
[root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh1:9092 --topicsftp-flume --from-beginning

文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




