暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

NIFI的静态单元测试(一)

NIFI实战 2021-06-04
1768

随着项目越来越复杂,质量越来越得不到保证。依赖自动化环境,集成测试,后知后觉,质量无法得到本质提升。追本溯源,在写代码的时候就能发现并处理掉异常是最有效的。


NIFI提供两种便于我们自测代码

  1. 远程debug,nifi项目目录下的conf下的bootstrap.conf第39行打开,可以通过ideal的remote打开8000端口进行debug。

  2. 引用nifi-mock进行单元测试

    <dependency>
    <groupId>org.apache.nifi</groupId>
    <artifactId>nifi-mock</artifactId>
       <version>${nifi.version}</version>
    <scope>test</scope>
    </dependency>

    下面进入今天正题写NIFI静态单元测试场景

    最简单的条用场景

      // 创建一个处理器的TestRunner
      final TestRunner runner = TestRunners.newTestRunner(new 处理器类());
      // 设置没有上游输入
      runner.setNonLoopConnection(false);
      // 设置不适用EL表达式验证配置
      runner.setValidateExpressionUsage(false);
      // 设置熟悉
      runner.setProperty(GetFile.DIRECTORY(处理器属性), 属性塞值);
      // 添加一个controller service
      runner.addControllerService("dbcp", ${controller service对象}, ${属性});
      // 启用controller service
      runner.enableControllerService(${controller service对象});
      // 添加controller service例子
      runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "dbcp");
      // 模拟flowifle的属性
      final Map<String, String> attributes = new HashMap<>();
      attributes.put("foo", "bar");
      // 模拟队列设值,并添加属性
      runner.enqueue("Hello Joe".getBytes(), attributes);
      // 模拟上游队列数据
      runner.enqueue("test".getBytes());
      // 模拟处理器调用 shedule->trigger,shedule失败会调用unshedule
      runner.run();
      // 验证路由结果数量
      runner.assertAllFlowFilesTransferred(处理器路由关系, 1);
      // 从成功xx处理器的结果xx路由关系,获取数据进行验证
      final MockFlowFile helloFF = runner.getFlowFilesForRelationship(xx处理器.REL_SUCCESS).get(0);
      helloFF.assertContentEquals("hello");

      备注:

      1. runner.enqueue("test".getBytes());写一次相当于上游队列路由一条flowfile过来。

      2. runner.run()相当于走一次trigger,会先走schedule,如果schedule异常直接进入unschedule。正常逻辑schedule->trigger,异常场景schedule->unschedule.

      例子一:测试一个controller service

        import com.metamx.tranquility.tranquilizer.Tranquilizer;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.nifi.components.PropertyDescriptor;
        import org.apache.nifi.controller.api.druid.DruidTranquilityService;
        import org.apache.nifi.processor.AbstractProcessor;
        import org.apache.nifi.processor.ProcessContext;
        import org.apache.nifi.processor.ProcessSession;
        import org.apache.nifi.processor.exception.ProcessException;
        import org.apache.nifi.util.TestRunner;
        import org.apache.nifi.util.TestRunners;
        import org.junit.Before;
        import org.junit.Test;
        import java.util.ArrayList;
        import java.util.List;
        import static org.mockito.Mockito.mock;


        public class DruidTranquilityControllerTest {
            // 模拟器
        private TestRunner runner;
        // 模拟controller service
        private MockDruidTranquilityController service;
            // 调度test函数之前除非
        @Before
        public void setup() throws Exception {
             // 初始化模拟器
        runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
                // 模拟一个controller service
        service = new MockDruidTranquilityController();
        // 给TestControllerServiceProcessor处理器设置controller service
        runner.addControllerService("Client Service", service);
        }
            // 验证校验
        @Test
            public void testValid() {
        runner.assertNotValid(service);
        runner.setProperty(service, DruidTranquilityController.DATASOURCE, "test");
        runner.assertNotValid(service);
        runner.setProperty(service, DruidTranquilityController.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
        runner.assertNotValid(service);
        runner.setProperty(service, DruidTranquilityController.AGGREGATOR_JSON, "[{\"type\": \"count\", \"name\": \"count\"}]");
        runner.assertValid(service);
        }


        public static class MockDruidTranquilityController extends DruidTranquilityController {


        Tranquilizer t = mock(Tranquilizer.class);
        CuratorFramework c = mock(CuratorFramework.class);


        @Override
        public Tranquilizer getTranquilizer() {
        return t;
        }


        @Override
        CuratorFramework getCurator(String zkConnectString) {
        return c;
        }
        }


        public static class TestControllerServiceProcessor extends AbstractProcessor {


        static final PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
        .name("Client Service")
        .description("DruidTranquilityService")
        .identifiesControllerService(DruidTranquilityService.class)
        .required(true)
        .build();


        @Override
        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        }


        @Override
        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
        propertyDescriptors.add(CLIENT_SERVICE);
        return propertyDescriptors;
        }
        }


        }

        例子二:测试一个Processor

          public class JExecuteSQLTest {
          private static final Logger LOGGER;


          static {
          LOGGER = LoggerFactory.getLogger(JExecuteSQLTest.class);
          }


          @BeforeClass
          public static void setupClass() {




          }


          private TestRunner runner;


          class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {


          @Override
          public String getIdentifier() {
          return "dbcp";
          }


          @Override
          public Connection getConnection() throws ProcessException {
          try {
          Class.forName("com.mysql.jdbc.Driver");
          final Connection con =
          DriverManager.getConnection("jdbc:mysql://192.168.42.6:3306/datax",
          "root", "123456");
          return con;
          } catch (final Exception e) {
          throw new ProcessException("getConnection failed: " + e);
          }
          }


          @Override
          public Connection getConnection(Map<String, String> attributes) throws ProcessException {
          return getConnection();
          }
          }
              // 模拟处理器设置controller service
          @Before
          public void setup() throws InitializationException {
          final DBCPService dbcp = new DBCPServiceSimpleImpl();
          final Map<String, String> dbcpProperties = new HashMap<>();
          runner = TestRunners.newTestRunner(JExecuteSQL.class);
          runner.addControllerService("dbcp", dbcp, dbcpProperties);
          runner.enableControllerService(dbcp);
          runner.setProperty(JExecuteSQL.DBCP_SERVICE, "dbcp");
          }
              // 设置属性并执行调用
          @Test
          public void testTrigger() {
                  runner.setProperty(JExecuteSQL.SQL_SELECT_QUERY, "SELECT * FROM `achievement_source` where id >:slot order by id limit 10");
          runner.setProperty(JExecuteSQL.LAST_RUN_METADATA_PATH, "E:\\nifi1.1.4\\nifi-nar-custom-extension\\nifi-custom-nar-bundles\\nifi-custom-common-bundle\\nifi-custom-common\\src\\test\\java\\com\\github\\nifi\\common\\test.txt");
          runner.setProperty(JExecuteSQL.MAX_VALUE_COLUMN_NAMES, "id");
          runner.setNonLoopConnection(false);
          runner.setValidateExpressionUsage(false);
          runner.run();
          // runner.assertAllFlowFilesTransferred(JExecuteSQL.REL_FAILURE, 0);
          runner.assertAllFlowFilesTransferred(JExecuteSQL.REL_SUCCESS, 1);
          }


          }
          文章转载自NIFI实战,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论