.NET for Apache Spark 0.4.0 was released recently. Therefore, it is now time to test, if it can be used for MQTT Streaming as well.
If you followed my series about a real-time data processing pipeline, you probably remember that I have used Apache Bahir to retrieve streaming data from Apache ActiveMQ via the MQTT protocol. The data itself was generated by IanniX and forwarded to ActiveMQ utilizing my osc2activemq docker image.
Preparing .NET for Apache Spark for MQTT Streaming
For the most parts, you can follow this quick intro tutorial, which walks you through the process of installing .NET for Apache Streaming, on either Windows, or Linux.
There is one important caveat though:
Apache Bahir, which I use for MQTT Streaming into Spark, is currently available in version 2.3.3 and only works with Apache Spark of the same version.
Keeping that in mind, you can either download Apache Spark 2.3.3 directly from the download page, or just use my Linux docker image tagged with 0.4.0-spark-2.3.3-linux.
Building the C# application with .NET Core
Having prepared your test environment, you are now ready to create a new dotnet core project using the following command in your cmd window or shell:
dotnet new console -o ActiveMqTest
Once it created the new folder with the related project files, change into it and add the Microsoft.Spark package by entering the following.
dotnet add package Microsoft.Spark --version 0.4.0
Next, open the Program.cs file and replace the code with the one listed below.
using Microsoft.Spark.Sql; namespace ActiveMqTest { class Program { static void Main(string[] args) { // Create a Spark session var spark = SparkSession .Builder() .AppName("ActiveMqTest") .GetOrCreate(); // Connect via MQTT var mqttDf = spark.ReadStream() .Format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .Option("brokerUrl", "tcp://192.168.10.159:1883") .Option("clientId", "ActiveMqTest") .Option("topic", "OscStream") .Load(); mqttDf.PrintSchema(); // Write any received data to the console var query = mqttDf.WriteStream() .Format("console") .Start(); // Run until terminated query.AwaitTermination(); } } }
Save the file and build the project.
dotnet build
Submitting to Spark
After the application successfully built, you can submit it to spark using the following command:
spark-submit --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.3 --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin\Debug\netcoreapp2.2\microsoft-spark-2.3.x-0.4.0.jar dotnet bin\Debug\netcoreapp2.2\ActiveMqTest.dll
The –packages parameter is needed to automatically download the required Apache Bahir MQTT Streaming provider library. Once downloaded, it is also automatically made available to our submitted job.
Here is an excerpt of the console output on my Windows VM after submitting.
Getting a valid output for PrintSchema(), tells me that I could successfully connect to ActiveMQ via MQTT.
The streaming query progress provides more details about the related batch of the stream.
So now it is time to start up IanniX and generate some data in real-time. Let’s watch the console output.
Viola, the data arrives. Great news!
Outlook
Just writing streaming data to the console is a good way of verifying that I can read from a stream. But of course I want to do more than just that.
Therefore, stay tuned for one of my next posts, as I plan to find out, how .NET for Apache Spark can be used to
- transform streaming data
- write the transformed data into a database
So have a great time and hope to see you again, soon.
8. May 2020
[…] .NET for Apache Spark – MQTT Streaming […]