apache spark - DataProc Avro Version Causing Error on Image v1.0.0 -


we running few dataproc jobs dataproc image 1.0 , spark-redshift.

we have 2 clusters, here details:

  • cluster -> runs pyspark streaming job, last created 2016. jul 15. 11:27:12 aest
  • cluster b -> runs pyspark batch jobs, cluster created everytime job run , teardown afterwards.
  • a & b runs same code base, use same init script, same node types etc.

since sometime last friday (2016-08-05 aest), our code stopped working on cluster b following error, while cluster running without issues.

the following code can reproduce issue on cluster b (or new cluster image v1.0.0) while runs fine on cluster a.

sample pyspark code:

from pyspark import sparkcontext, sqlcontext sc = sparkcontext() sql_context = sqlcontext(sc)  rdd = sc.parallelize([{'user_id': 'test'}]) df = rdd.todf()  sc._jsc.hadoopconfiguration().set("fs.s3n.awsaccesskeyid", "foo") sc._jsc.hadoopconfiguration().set("fs.s3n.awssecretaccesskey", "bar")  df\     .write\     .format("com.databricks.spark.redshift") \     .option("url", "jdbc:redshift://foo.ap-southeast-2.redshift.amazonaws.com/bar") \     .option("dbtable", 'foo') \     .option("tempdir", "s3n://bar") \     .option("extracopyoptions", "truncatecolumns") \     .mode("append") \     .save() 

the above code fails in both of following situations on cluster b, while running fine on a. note redshiftjdbc41-1.1.10.1010.jar created via cluster init script.

  • running in interactive mode on master node:

    pyspark_driver_python=ipython pyspark \     --verbose \     --master "local[*]"\     --jars /usr/lib/hadoop/lib/redshiftjdbc41-1.1.10.1010.jar \     --packages com.databricks:spark-redshift_2.10:1.0.0  
  • submit job via gcloud dataproc

    gcloud --project foo \    dataproc jobs submit pyspark \    --cluster bar \    --properties ^#^spark.jars.packages=com.databricks:spark-redshift_2.10:1.0.0#spark.jars=/usr/lib/hadoop/lib/redshiftjdbc41-1.1.10.1010.jar \    foo.bar.py 

the error produces (trace):

2016-08-08 06:12:23 warn  tasksetmanager:70 - lost task 6.0 in stage 45.0 (tid 121275, foo.bar.internal):      java.lang.nosuchmethoderror: org.apache.avro.generic.genericdata.createdatumwriter(lorg/apache/avro/schema;)lorg/apache/avro/io/datumwriter;     @ org.apache.avro.mapreduce.avrokeyrecordwriter.<init>(avrokeyrecordwriter.java:55)     @ org.apache.avro.mapreduce.avrokeyoutputformat$recordwriterfactory.create(avrokeyoutputformat.java:79)     @ org.apache.avro.mapreduce.avrokeyoutputformat.getrecordwriter(avrokeyoutputformat.java:105)     @ com.databricks.spark.avro.avrooutputwriter.<init>(avrooutputwriter.scala:82)     @ com.databricks.spark.avro.avrooutputwriterfactory.newinstance(avrooutputwriterfactory.scala:31)     @ org.apache.spark.sql.execution.datasources.basewritercontainer.newoutputwriter(writercontainer.scala:129)     @ org.apache.spark.sql.execution.datasources.defaultwritercontainer.writerows(writercontainer.scala:255)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:148)     @ org.apache.spark.sql.execution.datasources.insertintohadoopfsrelation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.apply(insertintohadoopfsrelation.scala:148)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:66)     @ org.apache.spark.scheduler.task.run(task.scala:89)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:227)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745)  2016-08-08 06:12:24 error yarnscheduler:74 - lost executor 63 on kinesis-ma-sw-o7he.c.bupa-ma.internal: container marked failed: container_1470632577663_0003_01_000065 on host: kinesis-ma-sw-o7he.c.bupa-ma.internal. exit status: 50. diagnostics: exception container-launch. container id: container_1470632577663_0003_01_000065 exit code: 50 stack trace: exitcodeexception exitcode=50:     @ org.apache.hadoop.util.shell.runcommand(shell.java:545)     @ org.apache.hadoop.util.shell.run(shell.java:456)     @ org.apache.hadoop.util.shell$shellcommandexecutor.execute(shell.java:722)     @ org.apache.hadoop.yarn.server.nodemanager.defaultcontainerexecutor.launchcontainer(defaultcontainerexecutor.java:212)     @ org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.containerlaunch.call(containerlaunch.java:302)     @ org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.containerlaunch.call(containerlaunch.java:82)     @ java.util.concurrent.futuretask.run(futuretask.java:266)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617)     @ java.lang.thread.run(thread.java:745) 

sparkredshift:1.0.0 requires com.databricks.spark-avro:2.0.1, requires org.apache.avro:1.7.6.

upon checking version of org.apache.avro.generic.genericdata on cluster a:

root@foo-bar-m:/home/foo# spark-shell \ >     --verbose \ >     --master "local[*]" \ >     --deploy-mode client \ >     --packages com.databricks:spark-redshift_2.10:1.0.0 \ >     --jars "/usr/lib/hadoop/lib/redshiftjdbc41-1.1.10.1010.jar" 

it produces (trace):

scala> import org.apache.avro.generic._ import org.apache.avro.generic._  scala> val c = genericdata.get() c: org.apache.avro.generic.genericdata = org.apache.avro.generic.genericdata@496a514f  scala> c.getclass.getprotectiondomain().getcodesource() res0: java.security.codesource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.5-hadoop2.jar <no signer certificates>) 

while running same command on cluster b:

scala> import org.apache.avro.generic._ import org.apache.avro.generic._  scala> val c = genericdata.get() c: org.apache.avro.generic.genericdata = org.apache.avro.generic.genericdata@72bec302  scala> c.getclass.getprotectiondomain().getcodesource() res0: java.security.codesource = (file:/usr/lib/hadoop/lib/bigquery-connector-0.7.7-hadoop2.jar <no signer certificates>) 

screenshot of env on cluster b. (apologies redactions). we've tried method described on here , here without success.

this frustrating dataproc updates image content without bumping release version complete opposite of immutable releases. our code broke , there no way roll previous version.

sorry trouble! it's not intended breaking changes occur within image version. note subminor versions rolled out "under hood" non-breaking bug fixes , dataproc-specific patches.

you can revert using 1.0.* version before last week specifying --image-version 1.0.8 when deploying clusters command-line:

gcloud dataproc clusters create --image-version 1.0.8 

edit: additional clarification, we've investigated avro versions in question , verified avro version numbers did not change in recent subminor dataproc release. core issue hadoop has had latent bug hadoop brings avro-1.7.4 under /usr/lib/hadoop/lib/ , spark uses avro-1.7.7. coincidentally google's bigquery connectory uses avro-1.7.7 turns out orthogonal known spark/hadoop problem 1.7.4 vs 1.7.7. recent image update deemed nonbreaking because versions in fact did not change, classloading ordering changed in nondeterministic way hadoop's bad avro version used hidden spark job pure luck, , no longer accidentally hidden in latest image.

dataproc's preview image includes fix avro version in hadoop layer should make future dataproc 1.1 version when comes out; might want consider trying preview version see if spark 2.0 seamless transition.


Comments

Popular posts from this blog

Spring Boot + JPA + Hibernate: Unable to locate persister -

go - Golang: panic: runtime error: invalid memory address or nil pointer dereference using bufio.Scanner -

c - double free or corruption (fasttop) -