暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

pyspark 二次启动报错问题排查

海涛技术漫谈 2020-04-11
1743

问题描述

近期,在使用notebook
的过程中,使用spark
的时候,出现了如下一个必现的问题:

通过spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
方式启动spark
,第一次启动spark
,启动一半还未成功时,手动将其中止了,然后按照上述方式,再次启动spark
的时候,必报如下错误,完整的日志信息见文末。

错误说的很明确,JVM
已经有sparkcontext
了,不允许再次创建,同时,也说了,可
通过设置 spark.driver.allowMultipleContexts = true
,开启多sparkcontext

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243).
To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

简单分析

通过查看日志,翻阅pyspark
的代码和其架构原理,发现导致此问题的原因:

是因为pyspark
已经将创建sparkcontext
的命名,通过py4j
发送到JVM Drive
了,在JVM Drive
层面,通过反射动态的初始化sparkContext
对象实例,

然后,创建spark
的时候,执行到assertNoOtherContextIsRunning
,因为当前已经有sparkContext
在执行,报错抛出异常。

虽然代码的执行被中止,但是相关的命令已经发送到py4j server
端,所以JVM
中还会继续完成sparkContext
的创建工作。

而且python
进程中的sparkContext
实例和JVM Drive
中的sparkContext
实例是相呼应的,在pyspark
中的SparkContext
类中,有一系列的方法控制JVM
sparkContext
的相关操作。

python
代码被中断后,pyspark
sparkContext
未创建成功,所以,JVM
中的sparkContext
将失去束缚,无法直接被操作了。

所以,再次执行getorcreate()
创建spark
的时候,发现JVM
已经有sparkContext
, 就报错了。

通过jps
查看当前系统的java
进程,然后jmap -dump:live,format=b,file=a.log <pid>
下载了当前Java
进程里存活对象的堆栈信息,

然后简单的通过jhat
简单分析堆栈文件,jhat a.log
,发现确实有sparkcontext
对象存在于jvm
中。


可能的解决思路

  1. 想办法复用JVM Driver
    sparkContext

  2. 将之前残留的sparkContext
    ,通过某种方式清理掉

  3. 设置allowMultipleContext
    开启多sparkContext

  4. 用户手册告知用户,出现此错误时,点击右上角按钮,restart
    当前的kernel

复用JVM Driver的sparkcontext

显然,如果能够做到,这是最合理,最佳的实践方式。虽然,我们使用pyspark
的 getorcreate()
方法,现在也有sparkcontext

直观看,貌似也是应该复用,直接拿当前的sparkContext
对象,但是这个getorcreate()
方法,是pyspark
层面的,也就是Ipyhton
内核这个进程内的,如果当前已经有创建好的sparkcontext
,就get
, 否则就create

所以,此情形下,Ipython
进程给JVM Driver
发送的命令其实都是,创建sparkcontext
示例对象,代码如下

# Create the Java SparkContext through Py4J
self._jsc = jsc or self._initialize_context(self._conf._jconf)

def _initialize_context(self, jconf):
    """
    Initialize SparkContext in function to allow subclass specific initialization
    """

    return self._jvm.JavaSparkContext(jconf)

然后,查看spark
scala
源码,在sparkcontext
的伴生对象中,有getOrCreate()
方法,

如果,将pyspark
直接实例化sparkcontext
, 改为调用getorcreate()
,理论上,貌似可行,但是需要改造pyspark
源码,成本较高,方式也不合理,

所以,这条路,貌似走不通。

清理之前的sparkContext

pyspark
的代码,其本身也采取的类似的方式,创建失败了,会完成各类清理工作,

SparkContext
__init__()
方法中,相关代码如下:

try:
    self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
                  conf, jsc, profiler_cls)
except:
    # If an error occurs, clean up in order to allow future SparkContext creation:
    self.stop()
    raise

现在问题就是,stop()
方法是SparkContext
中的实例方法,创建spark
过程中止,当前的sparkContext
实例未初始化完成,自然无法在代码中,显式的通过sc
实例调用这个这个stop
方法。

所以,貌似这个方法也无法行得通。

开启allowMultipleContexts

既然当前场景中,暂时无法简单的做到JVM
保持一个sparkcontext
,貌似,只能采取开启allowMultipleContexts

按照要求,spark
conf
中,设置spark.driver.allowMultipleContexts = true
, 发现确实创建成功,不再报错,applicationId = application_1585648660955_513512

通过,使用eclipse MAT
工具分析中止后的Java
dump
文件,和 再次创建spark
后的Java
dump
文件,发现,再次启动后,jvm
有两个sparkcontext
,一个是第一次中止的,一次是新创建的。暂时也不清楚,JVM
中两个sparkContext
是否会埋下什么坑。

中止后,JVM
SparkContext
类的对象情况

再次创建spark
后的Java
堆中,sparkContext
类的对象情况



补充

完整报错信息

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-af0aed49fd56> in <module>
----> 1 get_ipython().run_line_magic('spark''--s3key=keys/appId/hadoop-hdp/zhanhaitao/facf3179-a7c8-46ad-a998-f43dc9c30472')

/opt/conda/envs/notebook/lib/python3.6/site-packages/IPython/core/interactiveshell.py in run_line_magic(self, magic_name, line, _stack_depth)
   2305                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2306             with self.builtin_trap:
-> 2307                 result = fn(*args, **kwargs)
   2308             return result
   2309 

</opt/conda/envs/notebook/lib/python3.6/site-packages/decorator.py:decorator-gen-127in spark(self, line, cell)

/opt/conda/envs/notebook/lib/python3.6/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    185     # but it's overkill for just that one bit of state.
    186     def magic_deco(arg):
--> 187         call = lambda f, *a, **k: f(*a, **k)
    188 
    189         if callable(arg):

/opt/conda/envs/notebook/lib/python3.6/site-packages/sparkmagics/sparkmagics.py in spark(self, line, cell)
    154         print(id(builder))
    155         print(id(builder._lock))
--> 156         spark = builder.getOrCreate()
    157         sc = spark.sparkContext
    158         applicationId = sc.applicationId

/opt/meituan/spark-2.2/python/pyspark/sql/session.py in getOrCreate(self)
    167                     for key, value in self._options.items():
    168                         sparkConf.set(key, value)
--> 169                     sc = SparkContext.getOrCreate(sparkConf)
    170                     # This SparkContext may be an existing one.
    171                     for key, value in self._options.items():

/opt/meituan/spark-2.2/python/pyspark/context.py in getOrCreate(cls, conf)
    332         with SparkContext._lock:
    333             if SparkContext._active_spark_context is None:
--> 334                 SparkContext(conf=conf or SparkConf())
    335             return SparkContext._active_spark_context
    336 

/opt/meituan/spark-2.2/python/pyspark/context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    116         try:
    117             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
--> 118                           conf, jsc, profiler_cls)
    119         except:
    120             # If an error occurs, clean up in order to allow future SparkContext creation:

/opt/meituan/spark-2.2/python/pyspark/context.py in _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls)
    178 
    179         # Create the Java SparkContext through Py4J
--> 180         self._jsc = jsc or self._initialize_context(self._conf._jconf)
    181         # Reset the SparkConf to the one actually used by the SparkContext in JVM.
    182         self._conf = SparkConf(_jconf=self._jsc.sc().conf())

/opt/meituan/spark-2.2/python/pyspark/context.py in _initialize_context(self, jconf)
    271         Initialize SparkContext in function to allow subclass specific initialization
    272         """
--> 273         return self._jvm.JavaSparkContext(jconf)
    274 
    275     @classmethod

/opt/meituan/spark-2.2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1399         answer = self._gateway_client.send_command(command)
   1400         return_value = get_return_value(
-> 1401             answer, self._gateway_client, None, self._fqn)
   1402 
   1403         for temp_arg in temp_args:

/opt/meituan/spark-2.2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:236)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:748)
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2558)
    at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2554)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2554)
    at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:2656)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:2462)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:236)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)


文章转载自海涛技术漫谈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论