随着项目越来越复杂,质量越来越得不到保证。依赖自动化环境,集成测试,后知后觉,质量无法得到本质提升。追本溯源,在写代码的时候就能发现并处理掉异常是最有效的。
NIFI提供两种便于我们自测代码
远程debug,nifi项目目录下的conf下的bootstrap.conf第39行打开,可以通过ideal的remote打开8000端口进行debug。
引用nifi-mock进行单元测试
<dependency><groupId>org.apache.nifi</groupId><artifactId>nifi-mock</artifactId><version>${nifi.version}</version><scope>test</scope></dependency>
下面进入今天正题写NIFI静态单元测试场景
最简单的条用场景
// 创建一个处理器的TestRunnerfinal TestRunner runner = TestRunners.newTestRunner(new 处理器类());// 设置没有上游输入runner.setNonLoopConnection(false);// 设置不适用EL表达式验证配置runner.setValidateExpressionUsage(false);// 设置熟悉runner.setProperty(GetFile.DIRECTORY(处理器属性), 属性塞值);// 添加一个controller servicerunner.addControllerService("dbcp", ${controller service对象}, ${属性});// 启用controller servicerunner.enableControllerService(${controller service对象});// 添加controller service例子runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");// 模拟flowifle的属性final Map<String, String> attributes = new HashMap<>();attributes.put("foo", "bar");// 模拟队列设值,并添加属性runner.enqueue("Hello Joe".getBytes(), attributes);// 模拟上游队列数据runner.enqueue("test".getBytes());// 模拟处理器调用 shedule->trigger,shedule失败会调用unshedulerunner.run();// 验证路由结果数量runner.assertAllFlowFilesTransferred(处理器路由关系, 1);// 从成功xx处理器的结果xx路由关系,获取数据进行验证final MockFlowFile helloFF = runner.getFlowFilesForRelationship(xx处理器.REL_SUCCESS).get(0);helloFF.assertContentEquals("hello");
备注:
runner.enqueue("test".getBytes());写一次相当于上游队列路由一条flowfile过来。
runner.run()相当于走一次trigger,会先走schedule,如果schedule异常直接进入unschedule。正常逻辑schedule->trigger,异常场景schedule->unschedule.
例子一:测试一个controller service
import com.metamx.tranquility.tranquilizer.Tranquilizer;import org.apache.curator.framework.CuratorFramework;import org.apache.nifi.components.PropertyDescriptor;import org.apache.nifi.controller.api.druid.DruidTranquilityService;import org.apache.nifi.processor.AbstractProcessor;import org.apache.nifi.processor.ProcessContext;import org.apache.nifi.processor.ProcessSession;import org.apache.nifi.processor.exception.ProcessException;import org.apache.nifi.util.TestRunner;import org.apache.nifi.util.TestRunners;import org.junit.Before;import org.junit.Test;import java.util.ArrayList;import java.util.List;import static org.mockito.Mockito.mock;public class DruidTranquilityControllerTest {// 模拟器private TestRunner runner;// 模拟controller serviceprivate MockDruidTranquilityController service;// 调度test函数之前除非@Beforepublic void setup() throws Exception {// 初始化模拟器runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);// 模拟一个controller serviceservice = new MockDruidTranquilityController();// 给TestControllerServiceProcessor处理器设置controller servicerunner.addControllerService("Client Service", service);}// 验证校验@Testpublic void testValid() {runner.assertNotValid(service);runner.setProperty(service, DruidTranquilityController.DATASOURCE, "test");runner.assertNotValid(service);runner.setProperty(service, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");runner.assertNotValid(service);runner.setProperty(service, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]");runner.assertValid(service);}public static class MockDruidTranquilityController extends DruidTranquilityController {Tranquilizer t = mock(Tranquilizer.class);CuratorFramework c = mock(CuratorFramework.class);@Overridepublic Tranquilizer getTranquilizer() {return t;}@OverrideCuratorFramework getCurator(String zkConnectString) {return c;}}public static class TestControllerServiceProcessor extends AbstractProcessor {static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder().name("Client Service").description("DruidTranquilityService").identifiesControllerService(DruidTranquilityService.class).required(true).build();@Overridepublic void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {}@Overrideprotected List<PropertyDescriptor> getSupportedPropertyDescriptors() {List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();propertyDescriptors.add(CLIENT_SERVICE);return propertyDescriptors;}}}
例子二:测试一个Processor
public class JExecuteSQLTest {private static final Logger LOGGER;static {LOGGER = LoggerFactory.getLogger(JExecuteSQLTest.class);}@BeforeClasspublic static void setupClass() {}private TestRunner runner;class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {@Overridepublic String getIdentifier() {return "dbcp";}@Overridepublic Connection getConnection() throws ProcessException {try {Class.forName("com.mysql.jdbc.Driver");final Connection con =DriverManager.getConnection("jdbc:mysql://192.168.42.6:3306/datax","root", "123456");return con;} catch (final Exception e) {throw new ProcessException("getConnection failed: " + e);}}@Overridepublic Connection getConnection(Map<String, String> attributes) throws ProcessException {return getConnection();}}// 模拟处理器设置controller service@Beforepublic void setup() throws InitializationException {final DBCPService dbcp = new DBCPServiceSimpleImpl();final Map<String, String> dbcpProperties = new HashMap<>();runner = TestRunners.newTestRunner(JExecuteSQL.class);runner.addControllerService("dbcp", dbcp, dbcpProperties);runner.enableControllerService(dbcp);runner.setProperty(JExecuteSQL.DBCP_SERVICE, "dbcp");}// 设置属性并执行调用@Testpublic void testTrigger() {runner.setProperty(JExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM `achievement_source` where id >:slot order by id limit 10");runner.setProperty(JExecuteSQL.LAST_RUN_METADATA_PATH, "E:\\nifi1.1.4\\nifi-nar-custom-extension\\nifi-custom-nar-bundles\\nifi-custom-common-bundle\\nifi-custom-common\\src\\test\\java\\com\\github\\nifi\\common\\test.txt");runner.setProperty(JExecuteSQL.MAX_VALUE_COLUMN_NAMES, "id");runner.setNonLoopConnection(false);runner.setValidateExpressionUsage(false);runner.run();// runner.assertAllFlowFilesTransferred(JExecuteSQL.REL_FAILURE, 0);runner.assertAllFlowFilesTransferred(JExecuteSQL.REL_SUCCESS, 1);}}
文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




