Python DStream to Scala DStream

Ferdinand de Antoni

--

When creating tools and utilities on top of Spark, you undoubtedly have to create Python bindings so your Scala code can be called from Python. There is a lot of documentation out there about sending Scala (or Java) DStreams to Python, but not much about sending a DStream back to Scala.

Luckily, doing so is not that difficult. The key is to use the SerdeUtil object that is part of the Spark codebase. This object contains a pythonToJava method that takes an RDD with serialized Python objects and converts it to a JavaRDD. With this SerdeUtil object we can then create the following DStreamSupport utility:

Note that the package has to be org.apache.spark.api.python because the SerdeUtil is not a public API of Spark. It is marked as package private, so the only way we can access it is by placing DStreamSupport in the same package.

With our helper in place, we can now create a Scala method that can be called from Python, for example:

def writeBytes(stream: JavaDStream[Array[Byte]]): Unit = {
val converted = DStreamSupport.pythonToJava(stream)
.asInstanceOf[JavaDStream[Array[Byte]]]
val dstream: DStream[Array[Byte]] = converted.dstream
// ... use Scala DStream as normal from here on
}

When calling the writeBytes method above from Python, we expect a Python DStream with Array[Byte] content (e.g. serialized protobuf messages) that is pickled by the Python Pickler. This DStream is then converted by our DStreamSupport.pythonToJava() method into a properJavaDStream with Array[Byte] content that our Java/Scala code can properly deserialize (e.g. deserialize back to a protobuf message object).

Assuming we place our writeBytes method in an object called PythonHelper (in package com.example), we can now call it from Python as follows:

The Python write_stream() method takes a PySpark DStream object and passes it on to our Scala writeBytes() method.

--

--