Building a simple Spark data pipeline

Very recently I got a call from one my old colleagues who I had worked with in one of my previous jobs. She was talking about a requirement that involved streaming data ingestion and a data pipeline that used a lot of UDFs (User Defined Functions). I had moved away from these projects and have not done something similar for quite some time now. But this raised my curiosity. I planned to create a very simple data pipeline project in Java.

My simple project involves this use case. I have a JSON file feed for base employee information that will be stored in a specific directory. Also I have a text file feed that has employee monthly salary for a year and bonus information. I want to create a JSON output after enriching this data. My enrichment/ transformation will consist of the following two steps:

  • Aggregate employee salary to get Annual salary
  • Remove any extension from phone numbers
  • Calculate total salary including bonus amount (if any)

I decided on a few things at onset. Data sanity checks will not be a priority. I decided to use spark for data ingestion. Also I debated if I should included schema while loading datasets, but finally went against it.

So, what is data ingestion?

Data ingestion is the process of consuming data from one or more sources (like database, filesystem, TCP, streaming), doing some transformation and/ or enrichment and finally push it to a data lake or data warehouse. This may seem very much like a ETL (Extract, Transform, Load) process, but there are some subtle differences. Data ingestion involves a lot of different tools working in tandem with each other. It can do real time analytics while moving the data. This has become an important part of business strategy today.

Let’s Start

Firstly, let me define all the Java libraries I am including in my maven project.

<dependencies>
  <!-- JSON parser -->
  <dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.47</version>
  </dependency>

  <!-- Faker -->
  <dependency>
    <groupId>net.datafaker</groupId>
    <artifactId>datafaker</artifactId>
    <version>1.9.0</version>
  </dependency>

  <!-- Lombok -->
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.32</version>
    <scope>provided</scope>
  </dependency>

  <!-- Spark -->
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.13</artifactId>
    <version>3.3.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.13</artifactId>
    <version>3.3.2</version>
  </dependency>
</dependencies>

I needed a JSON library to write a JSON file. So this fastjson2 library is used. Lombok, of course comes in all my projects because I am too lethargic to write the getters and setters. Additionally it gives me the logger implementation for free. For sample files, I wanted to have something that looks real. So, I used faker to generate mock data. Since I am not using spark streaming in this project, I just imported spark-core and spark-sql in my project.

Creating the fake files

Fake Employee Master

As we discussed before, we will need couple of files that we are going to read from. First is the employee master file which does not have salary information. The model for this file looks as follows.

import java.io.Serializable;

import lombok.Data;

@Data
public class EmployeeBase implements Serializable {
    
    private static final long serialVersionUID = 1L;
    
    private String employeeId;
    private String jobTitle;
    private int    grade;
    private String firstName;
    private String lastName;
    private String phoneNumber;
    private String email;
}

Next we use faker (new Faker()) to generate some nice looking data. I pass it a set of pre-generated employee IDs. The reason I am pre-generating is so that I can have the salary information for the same set of users.

/**
 * Fake employee
 * @param ids
 * @return
 */
private List<EmployeeBase> fakeEmployeeGenerator(final List<String> ids) {

  final List<EmployeeBase> empList = new ArrayList<>();
  for (final String id : ids) {
    final EmployeeBase emp = new EmployeeBase();
    final String firstName = faker.name().firstName();
    final String lastName = faker.name().lastName();
    emp.setEmployeeId(id);
    emp.setFirstName(firstName);
    emp.setLastName(lastName);
    emp.setGrade(Integer.parseInt(faker.options().option("600", "700", "800", "900", "1000")));
    emp.setJobTitle(faker.job().title());
    emp.setEmail(firstName.toLowerCase() + "." + lastName.toLowerCase() + "@mycompany.com");
    emp.setPhoneNumber(faker.phoneNumber().phoneNumberNational());
    empList.add(emp);
  }

  return empList;
}

/**
 * JSONify the list so we can save to a file
 * @param ids
 * @return
 */
public String fakeEmployeeGeneratorJson(final List<String> ids) {
  final List<EmployeeBase> empList = fakeEmployeeGenerator(ids);
  return JSON.toJSONString(empList);
}

That’s it for the employee file. The only thing remaining is saving this JSON string to a file.

Fake Employee Salaries

Next up, salaries. Let’s start with the model again.

import java.io.Serializable;

import lombok.Data;

@Data
public class EmployeeSalaries implements Serializable {
    
    private static final long serialVersionUID = 1L;

    private String employeeId;
    private double salary;
    private int    bonusPercent;

    @Override
    public String toString() {
        final StringBuilder str = new StringBuilder();
        str.append(employeeId).append(",").append(salary).append(",").append(bonusPercent);
        return str.toString();
    }
}

If you see here, I have added the toString() override method. I could not use any of the variations that Lombok provides, as I need this to be a CSV line. This will be used later.

Let’s now get to populating this. We will populate random salary for the next 12 months. We will also give this employee a random fixed bonus.

/**
 * Fake salaries for employees
 * @param ids  
 * @return
 */
private List<EmployeeSalaries> fakeEmployeeSalaries(final List<String> ids) {
  final List<EmployeeSalaries> empSals = new ArrayList<>();
  for (final String id : ids) {

    final int bonusAmt = Integer.parseInt(faker.numerify("#"));
    for (int i=0; i<12; i++) {
      final EmployeeSalaries sal = new EmployeeSalaries();
      sal.setEmployeeId(id);
      sal.setSalary(Integer.parseInt(faker.numerify("####")));
      sal.setBonusPercent(bonusAmt);
      empSals.add(sal);
    }
  }

  return empSals;
}

/**
 * Return the employee salary list as CSV
 * @param ids
 * @return
 */
public String fakeEmployeeSalariesCsv(final List<String> ids) {
  final List<EmployeeSalaries> empSals = fakeEmployeeSalaries(ids);
  return empSals.stream().map(Object::toString).collect(Collectors.joining("\n"));
}

So, those are the two methods that generate the outputs. We store the result in empbase.json and empsals.csv and can use them later.

Here is a record from each file looks like:

{
  "email": "veronika.denesik@mycompany.com",
  "employeeId": "MD27149156",
  "firstName": "Veronika",
  "grade": 800,
  "jobTitle": "Future Administrator",
  "lastName": "Denesik",
  "phoneNumber": "(251) 251-3798 x4134"
}

And the salary file record for an employee:

MD27149156,1262.0,0
MD27149156,7433.0,0
MD27149156,5136.0,0
MD27149156,6942.0,0
MD27149156,5993.0,0
MD27149156,984.0,0
MD27149156,6649.0,0
MD27149156,4274.0,0
MD27149156,4148.0,0
MD27149156,9551.0,0
MD27149156,23.0,0
MD27149156,6554.0,0

Spark Data Pipeline

Okay, now that out of the way, let’s get to the crux of the project. At this time we have two files, one with the employee basic information and the other with 12 month salary for each employee.

We will start by creating a static method to get the Spark session object that will be used across the board.

/**
 * Return the spark session
 * @return
 */
public static SparkSession getSparkSession() {
  return SparkSession.builder()
    .appName("Employee Pipeline Test")
    .master("local")
    .getOrCreate();
} 

Loading Files

Spark already provides way of loading both JSON and CSV files. We will use those functions to load the files into corresponding datasets.

/**
 * This loads employee base file
 * @param empFile
 * @return
 */
private Dataset<Row> loadEmployeeBaseFile(final String empFile) {

  final Dataset<Row> df = SparkConfig.getSparkSession()
    .read()
    .format("org.apache.spark.sql.execution.datasources.json.JsonFileFormat")
    .option("multiline", true)
    .load(empFile);
  return df;
}

/**
 * This loads employee salary file from CSV
 * @param empSal
 * @return
 */
private Dataset<Row> loadEmployeeSalaryFile(final String empSal) {

  final Dataset<Row> df = SparkConfig.getSparkSession()
    .read()
    .csv(empSal)
    .withColumnRenamed("_c0", "employeeId")
    .withColumn("salary", functions.col("_c1").cast("int"))
    .withColumn("bonus", functions.col("_c2").cast("int"))
    .drop(functions.col("_c1"))
    .drop(functions.col("_c2"));

  // Group by Salary and return descending
  final Dataset<Row> dfAgg = df
    .groupBy(df.col("employeeId"), functions.col("bonus"))
    .sum("salary")
    .withColumnRenamed("sum(salary)", "annualSalary")
    .orderBy(functions.col("annualSalary").desc());
  return dfAgg;
}

Let’s consider the two methods defined above. The first one (loadEmployeeBaseFile) is fairly simple. It reads the employee JSON file and reads every record in memory in a dataset. The second method (loadEmployeeSalaryFile) does some additional steps. Since this CSV file did not have a header, columns were created as _c0, _c1 and _c2. We have renamed _c0 to employeeId, _c1 to salary and converted that to int. Also, _c2, we renamed to bonus and made that int type too.

Also, we have taken care of the aggregation right here. Remember we have 12 sets of data for each employee, we have salary information for each month. So, we are aggregating them into an annual salary before we send back the data.

Lets’ check out the datasets returned. This one is top 5 records from employee master.

emailemployeeIdfirstNamegradejobTitlelastNamephoneNumber
veronika.denesik@…MD27149156Veronika800Future AdministratorDenesik(251) 251-3798 x4134
kaleigh.schowalte…IW99502891Kaleigh700Healthcare Specia…Schowalter(601) 863-3767 x2879
geoffrey.langwort…YQ88107128Geoffrey700Direct Government…Langworth(810) 470-9464
nathan.schimmel@m…DQ54216887Nathan800Real-Estate Assoc…Schimmel(765) 703-4485 x5730
willis.franecki@m…HS13538780Willis1000Farming AnalystFranecki(215) 408-7790
Employee Master

Now let’s see how the salary dataset looks like.

employeeIdbonusannualSalary
HS13538780083422
JC71588660981211
CR86224966278511
WD73803326377681
HJ46328666375268
Employee Salary

So, if you see the dataset here, we have loaded annualSalary which is an aggregation of salaries.

/**
 * Main method that does the example processing
 * @param empFile
 * @param empSal
 */
public void processPipeline(final String empFile, final String empSal) {

  final Dataset<Row> empDataset  = loadEmployeeBaseFile(empFile);
  final Dataset<Row> empSalaries = loadEmployeeSalaryFile(empSal);

  log.info("Employee Count: " + empDataset.count());
  empDataset.show(10);
  log.info("Salaries Count: " + empSalaries.count());
  empSalaries.show(10);

  // Join the two
  final Dataset<Row> fullDataset = empDataset.join(empSalaries, 
    empSalaries.col("employeeId").equalTo(empDataset.col("employeeId")))
    .drop(empDataset.col("employeeId"));
  fullDataset.show(10);
  
  ... contd ...
}

Next we join the two datasets using employeeId as key. I also wrote couple of user data functions, For the transformation I was doing, this was not super necessary, but then one of my targets was to build UDFs.

What is UDF?

Many of these tools provide a way to extend the system to perform additional functions that are not default. The way they provide is through user defined functions. You can write custom functions in Java and register these with the underlying tool. These functions then become available to the tool.

Here we are writing two UDFs. The first one trims phone numbers to remove extensions and the second one just returns the salary after adding bonus amount.

/**
 * This is a sample for registering couple of UDFs
 */
private void registerUdfs() {

  // We will register two UDFs
  // 1. This will just substring the phone number, and get without extension
  SparkConfig.getSparkSession().udf().register("TRIMPHONE", 
             new UDF1<String, String>() {
             @Override
             public String call(final String phoneNumber) {
                 return phoneNumber.substring(0, 14);
             }
  }, DataTypes.StringType);

  // 2. Second one calculates the bonus from Salary
  SparkConfig.getSparkSession().udf().register("SALWITHBONUS", 
              new UDF2<Long, Integer, Integer>() {
              @Override
              public Integer call(final Long salary, final Integer bonus) {
                  return (bonus==0) ? 
                    salary.intValue() : 
                    salary.intValue() + (salary.intValue() * bonus/100);
              }
  }, DataTypes.IntegerType);
}

The first UDF takes one parameter, so we use UDF1. The second one takes two parameters, so in this case we use UDF2. Now Spark will allow us to use TRIMPHONE and SALWITHBONUS functions.

So, we create a new dataset using the two functions and save it.

... contd ...

// Register the UDFs
  registerUdfs();

  // Now select top 10 records
  final Dataset<Row> updDataset = fullDataset.selectExpr(
    "employeeId", 
    "firstName", 
    "lastName", 
    "TRIMPHONE(phoneNumber)", 
    "SALWITHBONUS(annualSalary, bonus)")
    .withColumnRenamed("TRIMPHONE(phoneNumber)", "phoneNumber")
    .withColumnRenamed("SALWITHBONUS(annualSalary, bonus)", "fullSalary");
  updDataset.show();

  updDataset.write().json("out/employee_data/");

Output looks as follows (for first 5),

employeeIdfirstNamelastNamephoneNumberfullSalary
WD73803326DenyseGrady(612) 214-340680011
UF59774513ShardaMcClure(928) 845-103561373
OW00373862KaraSchoen(314) 213-739651507
HS13538780WillisFranecki(215) 408-779083422
XJ74737569ShuKertzmann(843) 208-448867920
Final Output

Conclusion

That’s it! We started with two separate files, ingested it, did some transformations and finally wrote back one more file with the updates. Even though we had talked about streaming in the blog, I have not done any coding for that. We will take up streaming data ingestion from Kafka in the next blog. Till then, ciao!