以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图,
其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter
本篇文章重点分析下Interpreter模块,重点来看下测试类
zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@Testpublic void testInterpreter2() throws Exception {final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);server.init(new HashMap<>());server.intpEventClient = mock(RemoteInterpreterEventClient.class);Map<String, String> intpProperties = new HashMap<>();intpProperties.put("property_1", "value_1");intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");// create Test1Interpreter in session_1server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),intpProperties, "user_1");Test1Interpreter interpreter1 = (Test1Interpreter)((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0)).getInnerInterpreter();assertEquals(1, server.getInterpreterGroup().getSessionNum());assertEquals(1, server.getInterpreterGroup().get("session_1").size());assertEquals(2, interpreter1.getProperties().size());assertEquals("value_1", interpreter1.getProperty("property_1"));// create Test2Interpreter in session_1server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),intpProperties, "user_1");assertEquals(2, server.getInterpreterGroup().get("session_1").size());final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();intpContext.setNoteId("note_1");intpContext.setParagraphId("paragraph_1");intpContext.setGui("{}");intpContext.setNoteGui("{}");intpContext.setLocalProperties(new HashMap<>());// single output of SUCCESSRemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(),"COMBO_OUTPUT_SUCCESS", intpContext);System.out.println(new Gson().toJson(result));//List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage();//System.out.println(new Gson().toJson(resultMessages));/*assertEquals("SUCCESS", result.code);assertEquals(2, result.getMsg().size());assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/}
Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);Properties p = new Properties();p.putAll(properties);setSystemProperty(p);Constructor<Interpreter> constructor =replClass.getConstructor(new Class[]{Properties.class});Interpreter interpreter = constructor.newInstance(p);interpreter.setClassloaderUrls(new URL[]{});interpreter.setInterpreterGroup(interpreterGroup);interpreter.setUserName(userName);interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);
public InterpreterResult jobRun() throws Throwable {ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();try {InterpreterContext.set(context);// clear the result of last run in frontend before running this paragraph.context.out.clear();InterpreterResult result = null;// Open the interpreter instance prior to calling interpret().// This is necessary because the earliest we can register a hook// is from within the open() method.LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;if (!lazy.isOpen()) {lazy.open();result = lazy.executePrecode(context);}if (result == null || result.code() == Code.SUCCESS) {// Add hooks to script from registry.// note scope first, followed by global scope.// Here's the code after hooking:// global_pre_hook// note_pre_hook// script// note_post_hook// global_post_hookprocessInterpreterHooks(context.getNoteId());processInterpreterHooks(null);LOGGER.debug("Script after hooks: {}", script);result = interpreter.interpret(script, context);}// data from context.out is prepended to InterpreterResult if both definedcontext.out.flush();List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();for (InterpreterResultMessage resultMessage : result.message()) {// only add non-empty InterpreterResultMessageif (!StringUtils.isBlank(resultMessage.getData())) {resultMessages.add(resultMessage);}}List<String> stringResult = new ArrayList<>();for (InterpreterResultMessage msg : resultMessages) {if (msg.getType() == InterpreterResult.Type.IMG) {LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");} else {LOGGER.debug("InterpreterResultMessage: {}", msg);}stringResult.add(msg.getData());}// put result into resource poolif (context.getLocalProperties().containsKey("saveAs")) {if (stringResult.size() == 1) {LOGGER.info("Saving result into ResourcePool as single string: " +context.getLocalProperties().get("saveAs"));context.getResourcePool().put(context.getLocalProperties().get("saveAs"), stringResult.get(0));} else {LOGGER.info("Saving result into ResourcePool as string list: " +context.getLocalProperties().get("saveAs"));context.getResourcePool().put(context.getLocalProperties().get("saveAs"), stringResult);}}return new InterpreterResult(result.code(), resultMessages);} catch (Throwable e) {return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));} finally {Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);InterpreterContext.remove();}}
至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程
RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务
public static void main(String[] args) throws Exception {String zeppelinServerHost = null;int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;String portRange = ":";String interpreterGroupId = null;if (args.length > 0) {zeppelinServerHost = args[0];port = Integer.parseInt(args[1]);interpreterGroupId = args[2];if (args.length > 3) {portRange = args[3];}}RemoteInterpreterServer remoteInterpreterServer =new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);remoteInterpreterServer.start();/** Registration of a ShutdownHook in case of an unpredictable system call* Examples: STRG+C, SIGTERM via kill*/shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);Runtime.getRuntime().addShutdownHook(shutdownThread);remoteInterpreterServer.join();LOGGER.info("RemoteInterpreterServer thread is finished");/* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.* https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd* should be part of the next release and solve the problem.* We may have other threads that are not terminated successfully.*/if (remoteInterpreterServer.isForceShutdown) {LOGGER.info("Force shutting down");System.exit(0);}}
@Overridepublic void run() {RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =new RemoteInterpreterService.Processor<>(this);try (TServerSocket tSocket = new TServerSocket(port)){server = new TThreadPoolServer(new TThreadPoolServer.Args(tSocket).stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT).stopTimeoutUnit(TimeUnit.MILLISECONDS).processor(processor));if (null != intpEventServerHost && !isTest) {Thread registerThread = new Thread(new RegisterRunnable());registerThread.setName("RegisterThread");registerThread.start();}LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);server.serve();} catch (TTransportException e) {LOGGER.error("Failure in TTransport", e);}LOGGER.info("RemoteInterpreterServer-Thread finished");}
Apache Zeppelin 系列教程
Apache Zeppelin系列教程第三篇——Note的持久化管理
Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析
参考:
文章转载自诸葛子房的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




