Python DStream to Scala DStream
When creating tools and utilities on top of Spark, you undoubtedly have to create Python bindings so your Scala code can be called from Python. There is a lot of documentation out there about sending Scala (or Java) DStreams to Python, but not much about sending a DStream back to Scala.
Luckily, doing so is not that difficult. The key is to use the SerdeUtil object that is part of the Spark codebase. This object contains a pythonToJava
method that takes an RDD with serialized Python objects and converts it to a JavaRDD
. With this SerdeUtil
object we can then create the following DStreamSupport
utility:
Note that the package has to be org.apache.spark.api.python
because the SerdeUtil
is not a public API of Spark. It is marked as package private, so the only way we can access it is by placing DStreamSupport
in the same package.
With our helper in place, we can now create a Scala method that can be called from Python, for example:
def writeBytes(stream: JavaDStream[Array[Byte]]): Unit = {
val converted = DStreamSupport.pythonToJava(stream)
.asInstanceOf[JavaDStream[Array[Byte]]]
val dstream: DStream[Array[Byte]] = converted.dstream
// ... use Scala DStream as normal from here on
}
When calling the writeBytes
method above from Python, we expect a Python DStream with Array[Byte]
content (e.g. serialized protobuf messages) that is pickled by the Python Pickler. This DStream is then converted by our DStreamSupport.pythonToJava()
method into a properJavaDStream
with Array[Byte]
content that our Java/Scala code can properly deserialize (e.g. deserialize back to a protobuf message object).
Assuming we place our writeBytes
method in an object called PythonHelper
(in package com.example
), we can now call it from Python as follows:
The Python write_stream()
method takes a PySpark DStream object and passes it on to our Scala writeBytes()
method.