Apache Spark SQL: Baby Names Part 3

Reading Time: < 1 minute

Scanning all files downloaded calculating rank and adding the year to results.


import glob
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.window import Window

def processFile(filename, year):
    script_dir = os.path.dirname(__file__)
    data_file_path = "../data/" + filename
    data_file = os.path.join(script_dir, data_file_path)

    sparkSession = SparkSession.builder.appName(

    schema = StructType([
        StructField('name', StringType(), True),
        StructField('gender', StringType(), True),
        StructField('amount', IntegerType(), True)])

    namesDF = sparkSession.read.schema(schema).csv(data_file)
    nameSpec = Window.partitionBy("gender").orderBy(functions.desc("amount"))

    results = namesDF.withColumn(
        "rank", functions.dense_rank().over(nameSpec))

    results.withColumn("year", functions.lit(year)).coalesce(1).write.format("csv").mode('overwrite').save(
        "results/rank_names_partitioned_" + year, header="true")


for file in glob.glob("*.txt"):
    year = re.sub("[^0-9]", '', file)
    processFile(file, year)

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.