Apache DataFu™

Getting Started

DataFu Spark Docs

DataFu Pig Docs

DataFu Hourglass Docs

Community

Apache Software Foundation

Apache DataFu Spark

Guide

Apache DataFu Spark is a collection of utils and user-defined functions for working with large scale data in Apache Spark. It has a number of useful functions available. This guide will provide examples of how to use these functions and serves as an overview for working with the library.

Spark Compatibility

The current version of DataFu has been tested against Spark versions 2.1.x - 2.4.x, in Scala 2.10, 2.11 and 2.12 (where applicable). The JAR for Scala 2.11 has been published to the Apache Maven Repository. Other versions can be built by downloading the source and following the build instructions.

Calling DataFu Spark functions from PySpark

In order to call the datafu-spark API's from Pyspark, you can do the following (tested on a Hortonworks vm)

First, call pyspark with the following parameters

export PYTHONPATH=datafu-spark_2.11-1.6.0.jar

pyspark --jars datafu-spark_2.11-1.6.0-SNAPSHOT.jar --conf spark.executorEnv.PYTHONPATH=datafu-spark_2.11-1.6.0-SNAPSHOT.jar

The following is an example of calling the Spark version of the datafu dedup method

from pyspark_utils.df_utils import PySparkDFUtils

df_utils = PySparkDFUtils()

df_people = sqlContext.createDataFrame([
     ("a", "Alice", 34),
     ("a", "Sara", 33),
     ("b", "Bob", 36),
     ("b", "Charlie", 30),
     ("c", "David", 29),
     ("c", "Esther", 32),
     ("c", "Fanny", 36),
     ("c", "Zoey", 36)],
     ["id", "name", "age"])

func_dedup_res = df_utils.dedup_with_order(dataFrame=df_people, groupCol=df_people.id,
                                orderCols=[df_people.age.desc(), df_people.name.desc()])

func_dedup_res.registerTempTable("dedup")

func_dedup_res.show()

This should produce the following output

+---+-----+---+
| id| name|age|
+---+-----+---+
|  c| Zoey| 36|
|  b|  Bob| 36|
|  a|Alice| 34|
+---+-----+---+

Using DataFu to do Skewed Joins

DataFu-Spark contains two methods for doing skewed joins.

broadcastJoinSkewed can be used in cases when one data frame is skewed and the other is not skewed. It splits both of the data frames to two parts according to the skewed keys. For example, let's say we have two data frames, customers, which isn't skewed:

+-------+-----------+
|company|year_joined|
+-------+-----------+
| paypal|       2017|
| myshop|       2019|
+-------+-----------+

And transactions, which is skewed on the field company:

+--------------+-------+
|transaction_id|company|
+--------------+-------+
|             1| paypal|
|             2| paypal|
|             3| paypal|
|             4| paypal|
|             5| paypal|
|             6| paypal|
|             7| paypal|
|             8| paypal|
|             9| myshop|
+--------------+-------+

In order to join them, we need to determine how many rows we would like to broadcast. In our case, with only one skewed key, we would use 1, like this:

val result = customers.broadcastJoinSkewed(transactions, Seq("company", 1))

The result will look like this, just as if we had used a regular join.

+-------+-----------+--------------+
|company|year_joined|transaction_id|
+-------+-----------+--------------+
| myshop|       2019|             9|
| paypal|       2017|             1|
| paypal|       2017|             2|
| paypal|       2017|             3|
| paypal|       2017|             4|
| paypal|       2017|             5|
| paypal|       2017|             6|
| paypal|       2017|             7|
| paypal|       2017|             8|
+-------+-----------+--------------+

Doing a join between a number and a range

An interesting type of join that DataFu allows you to do is between a point and a range. A naive solution for this might explode the range columns, but this would cause the table to become huge. The DataFu joinWithRange method takes a decrease factor in order to deal with this problem.

For an example, let's imagine two data frames, one of graded papers and one representing a system for scoring.

The dataframe for grades might look like this:

+-----+-------+
|grade|student|
+-----+-------+
|   37| tyrion|
|   72|   robb|
|   83| renley|
|   64|    ned|
|   95|  sansa|
|   88|   arya|
|   79| cersei|
|   81|  jaime|
+-----+-------+

The scoring system might look like this:

+-----+---+---+
|grade|min|max|
+-----+---+---+
|    A| 90|100|
|    B| 80| 90|
|    C| 70| 80|
|    D| 60| 70|
|    F|  0| 60|
+-----+---+---+

We will use a decrease factor of 10, since each range is of size at least 10.

skewed.joinWithRange("grade", notskewed, "min", "max", 2).show

Our result will be as follows:

+-----+-------+-----+---+---+
|grade|student|grade|min|max|
+-----+-------+-----+---+---+
|   37| tyrion|    F|  0| 60|
|   72|   robb|    C| 70| 80|
|   83| renley|    B| 80| 90|
|   64|    ned|    D| 60| 70|
|   95|  sansa|    A| 90|100|
|   88|   arya|    B| 80| 90|
|   79| cersei|    C| 70| 80|
|   81|  jaime|    B| 80| 90|
+-----+-------+-----+---+---+

In order to use joinWithRange on tables, they need to meet two requirements:

  1. the points table (grades in our example) needs to be distinct on the point column
  2. the range and point columns need to be numeric

If there are ranges that overlap, a point that matches will be joined to all the ranges that include it. In order to take only one range per point, you can use the joinWithRangeAndDedup method.

It takes the same parameters as joinWithRange, with one addition - whether to match the largest or smallest range that contains a point.