Spark to PostgreSQL – Real-time data processing pipeline – Part 5

Previously I have demonstrated how streaming data can be read and transformed in Apache Spark. This time I use Spark to persist that data in PostgreSQL.

Quick recap – Spark and JDBC

As mentioned in the post related to ActiveMQ, Spark and Bahir, Spark does not provide a JDBC sink out of the box. Therefore, I will have to use the foreach sink and implement an extension of the org.apache.spark.sql.ForeachWriter. It will take each individual data row and write it to PostgreSQL.

Preparing PostgreSQL

TimescaleDB logo

Even though I want to use PostgreSQL, I am actually

more

Real-time data processing pipeline – Part 4 – data transformation

So far, we know how to get our streaming data from ActiveMQ into Spark by using Bahir. On this basis, it is now time to implement the data transformation required to get to the desired output format.

Extract

As a quick reminder, here is the Scala code that I have used so far to retrieve the data from ActiveMQ and write it to a memory sink.

%spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// create a named session
val spark = SparkSession
    .builder()
    .appName("ReadMQTTStream")
    .getOrCreate()

// read data from the OscStream topic
val mqttDf = 
more

ActiveMQ, Spark & Bahir – Real-time data processing pipeline – Part 3

ActiveMQ, Spark & Bahir - Real-time data processing pipeline – Part 3

Having explained how to visually simulate sensor data and how to get it into ActiveMQ during the first two parts, it is now time to explore an initial setup that allows Apache Spark to read a data stream from ActiveMQ using Bahir.

Things to be aware of

Before we start, there are a couple of things you should be aware of, in case you want to follow along and try this out yourself.

Spark & Bahir version matching

Apache Bahir Spark Extensions 2.3.3

As stated above, I would like to subscribe to an ActiveMQ topic with Apache Bahir to transfer

more

Apache Zeppelin mit PySpark und PostgreSQL benutzen – Teil 3

Nachdem wir in Teil 1 und Teil 2 bereits erfahren haben, wie wir Zeppelin direkt mit PostgreSQL benutzen können, widmet sich der letzte Teil der Abfrage der Testdatenbank mit PySpark.

Ehe wir loslegen können, braucht unsere Datenbank erst einmal ein paar Testdaten. Da es in dieser Miniserie nicht um das Abfragen und Aufbereiten von Daten an sich geht, benutze ich für das schnelle Erzeugen von Daten einfach das Tool PgBench. Details dazu könnt ihr unter https://www.postgresql.org/docs/11/pgbench.html nachlesen.

Folgendes Kommando erzeugt dabei 100000 Datensätze in der Tabelle pgbench_accounts

pgbench -d -U testadmin -i test

Nun können wir … more

Apache Zeppelin mit PySpark und PostgreSQL benutzen – Teil 2

Im ersten Teil habe ich beschrieben, wie ich das Docker Image von Apache Zeppelin um die Spark Version 2.4.0 erweitert und kurz getestet habe.

In diesem Teil werde ich einen eigenen Interpreter für PostgreSQL hinzufügen und eine erste Abfrage durchführen, um diesen zu testen. Danach wird noch die Konfiguration für den Spark Interpreter angepasst, damit dieser auch mit PostgreSQL arbeiten kann.

Um den neuen Interpreter hinzuzufügen, klicke ich auf das Drop-Down Menü, rechts im Zeppelin Header und wähle den Punkt Interpreter aus

Mit Create wird die Seite zum Konfigurieren des neuen Interpreters geladen und unter Interpreter more

Scroll to top