暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Zeppelin系列教程第五篇——Interpreter原理分析

20

以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图,

其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter

本篇文章重点分析下Interpreter模块,重点来看下测试类

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java

    @Test
    public void testInterpreter2() throws Exception {
    final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
    RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
    server.init(new HashMap<>());
    server.intpEventClient = mock(RemoteInterpreterEventClient.class);


    Map<String, String> intpProperties = new HashMap<>();
    intpProperties.put("property_1", "value_1");
    intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");


    // create Test1Interpreter in session_1
    server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
    intpProperties, "user_1");
    Test1Interpreter interpreter1 = (Test1Interpreter)
    ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
    .getInnerInterpreter();
    assertEquals(1, server.getInterpreterGroup().getSessionNum());
    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
    assertEquals(2, interpreter1.getProperties().size());
    assertEquals("value_1", interpreter1.getProperty("property_1"));


    // create Test2Interpreter in session_1
    server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
    intpProperties, "user_1");
    assertEquals(2, server.getInterpreterGroup().get("session_1").size());




    final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
    intpContext.setNoteId("note_1");
    intpContext.setParagraphId("paragraph_1");
    intpContext.setGui("{}");
    intpContext.setNoteGui("{}");
    intpContext.setLocalProperties(new HashMap<>());


    // single output of SUCCESS
    RemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(),
    "COMBO_OUTPUT_SUCCESS", intpContext);
    System.out.println(new Gson().toJson(result));
    //List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage();
    //System.out.println(new Gson().toJson(resultMessages));
    /*assertEquals("SUCCESS", result.code);
    assertEquals(2, result.getMsg().size());
    assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
    assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/
    }

    这边简单修改了这个测试类的代码

    createInterpreter 是采用反射的方式构建进行实例化Interpreter,核心代码如下:

            Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
      Properties p = new Properties();
      p.putAll(properties);
      setSystemProperty(p);


      Constructor<Interpreter> constructor =
      replClass.getConstructor(new Class[]{Properties.class});
      Interpreter interpreter = constructor.newInstance(p);
      interpreter.setClassloaderUrls(new URL[]{});


      interpreter.setInterpreterGroup(interpreterGroup);
      interpreter.setUserName(userName);


      interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);

      interpreter 方法,就是实际执行具体的interpreter了,通过放入队列然后去执行job,最终实际执行代码InterpretJob 里面的jobRun()方法

        public InterpreterResult jobRun() throws Throwable {
        ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();
        try {
        InterpreterContext.set(context);
        // clear the result of last run in frontend before running this paragraph.
        context.out.clear();


        InterpreterResult result = null;


        // Open the interpreter instance prior to calling interpret().
        // This is necessary because the earliest we can register a hook
        // is from within the open() method.
        LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
        if (!lazy.isOpen()) {
        lazy.open();
        result = lazy.executePrecode(context);
        }


        if (result == null || result.code() == Code.SUCCESS) {
        // Add hooks to script from registry.
        // note scope first, followed by global scope.
        // Here's the code after hooking:
        // global_pre_hook
        // note_pre_hook
        // script
        // note_post_hook
        // global_post_hook
        processInterpreterHooks(context.getNoteId());
        processInterpreterHooks(null);
        LOGGER.debug("Script after hooks: {}", script);
        result = interpreter.interpret(script, context);
        }


        // data from context.out is prepended to InterpreterResult if both defined
        context.out.flush();
        List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();


        for (InterpreterResultMessage resultMessage : result.message()) {
        // only add non-empty InterpreterResultMessage
        if (!StringUtils.isBlank(resultMessage.getData())) {
        resultMessages.add(resultMessage);
        }
        }


        List<String> stringResult = new ArrayList<>();
        for (InterpreterResultMessage msg : resultMessages) {
        if (msg.getType() == InterpreterResult.Type.IMG) {
        LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");
        } else {
        LOGGER.debug("InterpreterResultMessage: {}", msg);
        }
        stringResult.add(msg.getData());
        }
        // put result into resource pool
        if (context.getLocalProperties().containsKey("saveAs")) {
        if (stringResult.size() == 1) {
        LOGGER.info("Saving result into ResourcePool as single string: " +
        context.getLocalProperties().get("saveAs"));
        context.getResourcePool().put(
        context.getLocalProperties().get("saveAs"), stringResult.get(0));
        } else {
        LOGGER.info("Saving result into ResourcePool as string list: " +
        context.getLocalProperties().get("saveAs"));
        context.getResourcePool().put(
        context.getLocalProperties().get("saveAs"), stringResult);
        }
        }
        return new InterpreterResult(result.code(), resultMessages);
        } catch (Throwable e) {
        return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
        } finally {
        Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
        InterpreterContext.remove();
        }
        }

        至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程

        RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务

          public static void main(String[] args) throws Exception {
          String zeppelinServerHost = null;
          int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
          String portRange = ":";
          String interpreterGroupId = null;
          if (args.length > 0) {
          zeppelinServerHost = args[0];
          port = Integer.parseInt(args[1]);
          interpreterGroupId = args[2];
          if (args.length > 3) {
          portRange = args[3];
          }
          }
          RemoteInterpreterServer remoteInterpreterServer =
          new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
          remoteInterpreterServer.start();


          /*
          * Registration of a ShutdownHook in case of an unpredictable system call
          * Examples: STRG+C, SIGTERM via kill
          */
          shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);
          Runtime.getRuntime().addShutdownHook(shutdownThread);


          remoteInterpreterServer.join();
          LOGGER.info("RemoteInterpreterServer thread is finished");


          /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.
          * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd
          * should be part of the next release and solve the problem.
          * We may have other threads that are not terminated successfully.
          */
          if (remoteInterpreterServer.isForceShutdown) {
          LOGGER.info("Force shutting down");
          System.exit(0);
          }
          }
            @Override
            public void run() {
            RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
            new RemoteInterpreterService.Processor<>(this);
            try (TServerSocket tSocket = new TServerSocket(port)){
            server = new TThreadPoolServer(
            new TThreadPoolServer.Args(tSocket)
            .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT)
            .stopTimeoutUnit(TimeUnit.MILLISECONDS)
            .processor(processor));


            if (null != intpEventServerHost && !isTest) {
            Thread registerThread = new Thread(new RegisterRunnable());
            registerThread.setName("RegisterThread");
            registerThread.start();
            }
            LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
            server.serve();
            } catch (TTransportException e) {
            LOGGER.error("Failure in TTransport", e);
            }
            LOGGER.info("RemoteInterpreterServer-Thread finished");
            }


            Apache Zeppelin 系列教程

            Apache Zeppelin系列教程第一篇——安装和使用

            Apache Zeppelin系列教程第二篇——整体架构

            Apache Zeppelin系列教程第三篇——Note的持久化管理

            Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析


            参考:

            mock 使用:https://zhuanlan.zhihu.com/p/51673406

            Apache Thrift:https://juejin.cn/post/6844903622380093447

            文章转载自诸葛子房的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论