暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何给你的 spark streaming 做单元测试

张江打工人 2018-04-08
1156

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载
我们都知道,从写代码到测试再到上线, 越到后期认真,你付出的时间和精力成本越大,单元测试是阻挡 bug 的第一道防线,不写单元测试的程序员不是一个合格的程序员,有了单元测试后,妈妈再也不用担心我进行重构了,今天我来谈下如何对 spark streaming 来进行单元测试

拿最常见的一个 spark streaming 业务场景来看, spark streaming 消费 kafka 中的数据, 首先你需要 mock 你的数据源, 启动一个内嵌的 zookeeper 和 kafka server

引用相关依赖的jar包

 compile 'org.apache.kafka:kafka-clients:0.10.0.1'
 compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.10.0.1'
 testCompile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.6'

好了下面我们启动一个内嵌的 zookeeper 和一个内嵌的 kafka server

brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()

这时候我们有了 mock 的 zk 和 kakfa  server, 下面我们来使用 scalatest 框架写几个测试例子

这个例子中,我创建一个 spark streaming 实例, 使用 kafkaTestUtils 创建一个 topic 并且打入一些json 格式的消息,

val data = Map("{'domain': 'www.a.com', 'size': 6}" -> 7, "{'domain': 'www.b.com', 'size': 7}" -> 9)
kafkaTestUtils.createTopic(topic)
kafkaTestUtils.sendMessages(topic, data)

后面的数字代表打入本条消息重复打入的次数
创建一个 DirectKafkaInputDStream, 读取 kafka 中的消息,转换成 json 格式, 转换为 Dataset, 使用sql语句进行聚合,在外部定义一个变量,用来累加每次聚合的结果,
最后判断总的聚合结果是否等于期待值

eventually(timeout(40000.milliseconds), interval(200.milliseconds)) {
   assert(resultresult.get("www.a.com") == 42L)
   assert(resultresult.get("www.b.com") == 63L)
}

这里使用了了 scalatest 里面的 eventually 方法,也就是我们主线程会一直阻塞在这里,这时候 spark streaming 会不断生成批次job运行, 在40s内,每隔 200 ms 去检测一次,如果等于期待值,就pass, 如果超时也没等于期待值,那么这个 unit test 就fail了。
欢迎关注 spark技术分享

文章转载自张江打工人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论