暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flume采集sftp服务器目录到Kafka实战

大数据从业者 2021-10-11
2874

背景

接着上文后续,本文主要介绍使用flume采集sftp服务器指定目录数据到kafka集群。原生flume还是不支持sftp-source,老规矩,还是使用第三方插件,地址: https://github.com/keedio/flume-ftp-source


如图所示:需要使用额外的依赖jar:commons-net-3.3.jar、jsch-0.1.54.jar。

 

sftp服务器搭建

linux系统默认就是一个sftp服务器,可以不做任何配置,仅仅增加一个用户甚至直接使用root用户就行。不过,安全性差,仅用于测试。

如果生产环境考虑安全性,需要配置/etc/ssh/sshd_config,重启sshd服务。详见文章: https://www.cnblogs.com/felixzh/p/15384466.html

 

flume-ftp-source插件

下载源码编译对应flume1.9.0的插件jar

    git clone https://github.com/keedio/flume-ftp-source.git
    mvn clean package –DskipTests –Dflume.version=1.9.0

    将打包结果jar和上述依赖jar拷贝到flume/lib文件夹


    废话少说,直接上配置文件

      [root@felixzh apache-flume-1.9.0-bin]# catsftp-flume-kafka.conf
      agent.sources = sftp-source
      agent.sinks = k1
      agent.channels = ch

      agent.sources.sftp-source.type =org.keedio.flume.source.ftp.source.Source
      agent.sources.sftp-source.client.source =sftp
      agent.sources.sftp-source.name.server =felixzh1
      agent.sources.sftp-source.user = root
      agent.sources.sftp-source.password =felixzh
      agent.sources.sftp-source.port = 22
      #sftp服务器需要采集的目录
      agent.sources.sftp-source.working.directory= home/bigdata/testData
      #flume节点状态数据目录
      agent.sources.sftp-source.folder = tmp/
      agent.sources.sftp-source.file.name =ftp2-status-file.ser
      agent.sources.sftp-source.run.discover.delay=5000
      agent.sources.sftp-source.flushlines = true
      agent.sources.sftp-source.search.recursive= true
      agent.sources.sftp-source.processInUse =false
      agent.sources.sftp-source.processInUseTimeout= 30

      agent.sources.sftp-source.strictHostKeyChecking= no
      agent.sources.sftp-source.knownHosts =~/.ssh/known_hosts

      agent.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
      agent.sinks.k1.topic = sftp-flume
      agent.sinks.k1.brokerList = felixzh1:9092
      agent.sinks.k1.batchsize = 200
      agent.sinks.kafkaSink.requiredAcks=1
      agent.sinks.k1.serializer.class =kafka.serializer.StringEncoder
      agent.sinks.kafkaSink.zookeeperConnect=felixzh1:2181
      agent.channels.ch.type = memory
      agent.channels.ch.capacity = 10000
      agent.channels.ch.transactionCapacity =10000
      agent.channels.hbaseC.keep-alive = 20

      agent.sources.sftp-source.channels = ch
      agent.sinks.k1.channel = ch

      配置参数列表

        m : stands for parameter is mandatory for above source
        o : optional
        x : not available

         

        创建kafka测试topic

          [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-topics.sh --zookeeper felixzh1:2181 --topic sftp-flume --create--partitions 1 --replication-factor 1
          Created topic sftp-flume.


          Sftp服务器准备测试数据


          启动flume任务

            [root@felixzh apache-flume-1.9.0-bin]#./bin/flume-ng agent -n agent -c conf/ -f sftp-flume-kafka.conf-Dflume.root.logger=INFO,console


            消费kafka测试topic

              [root@felixzh1 kafka_2.12-2.7.1]#bin/kafka-console-consumer.sh --bootstrap-server felixzh1:9092 --topicsftp-flume --from-beginning


              文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论