Mutable Ideas

Notes and ideas about Java, Scala, Big Data, NoSQL, Quality and Software Deploy

Making Hadoop 2.6 + Spark-Cassandra Driver Play Nice Together

We have been using Spark Standalone deploy for more than one year now, but recently I tried to use Azure’s HDInsight which runs on Hadoop 2.6 (YARN deploy).

After provisioning the servers, all small tests worked fine, I have been able to run Spark-Shell, read and write to Blob Storage, until I tried to write to Datastax Cassandra cluster which constantly returned a error message: Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042

stack trace

I have to confess I am a little bit ashamed of spending a lot of time on the Failed to open native connection to Cassandra at {10.0.1.4}:9042 error part, instead of looking for the original cause, which was: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Exception in thread "main" java.io.IOException: Failed to open native connection to Cassandra at {10.0.1.4}:9042
  at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
  at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
  at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
  at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
  at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
  at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
  at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
  at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120)
  at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:241)
  at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:263)
  at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
  at App$.main(App.scala:30)
  at App.main(App.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
  at com.datastax.driver.core.Connection.initAsync(Connection.java:176)
  at com.datastax.driver.core.Connection$Factory.open(Connection.java:724)
  at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:250)
  at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
  at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
  at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1269)
  at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
  at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
  ... 21 more

After spending a few hours making sure the network part was indeed correctly configured, I starting looking for the error com.google.common.util.concurrent.Futures.withFallback, I found a lot people asking questions about:

And the most significant discussions that led me to the solution were:

Solution

After reading those tickets the solution was pretty obvious, all I had to do was to shade Guava on my package, which is pretty simple considering I’m using the latest version of sbt-assembly. All I had to do was change the references of com.google.** to rename classes as shadeio.** and everything worked as it should be!

Below a snippet of my project/assembly.sbt and build.sbt files:

1
2
// project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// build.sbt

import sbt.Keys._

name := "spark-dse-test"

version := "1.0"

scalaVersion := "2.10.5"

scalacOptions := Seq("-deprecation", "-unchecked", "-feature")

libraryDependencies ++= Seq(
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M2",
  "org.apache.spark" %% "spark-core" % "1.5.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.5.1" % "provided"
)

// There is a conflict between Guava versions on Cassandra Drive and Hadoop
// Shading Guava Package
assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("com.google.**" -> "shadeio.@1").inAll
)

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
  case _ => MergeStrategy.first
}

I hope this make fixing this issue easier than it was to me :)

.

Comments