QBoard » Big Data » Big Data - Spark » Load CSV file with Spark

Load CSV file with Spark

  • I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing :

    sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()
    I would expect this call to give me a list of the two first columns of my file but I'm getting this error :

    File "", line 1, in
    IndexError: list index out of range
    although my CSV file as more than one column.
      June 12, 2019 12:08 PM IST
    0
  • Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

    sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()
    Alternatively, you could print the culprit (if any):

    sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
      June 12, 2019 12:09 PM IST
    0
    • Raji Reddy A
      Raji Reddy A There are plenty of tools to parse csv, don't reinvent the wheel
      June 14, 2019
  • Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c), so it's not recommended. zero323's answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:

    # works for both python 2 and 3
    import csv
    rdd = sc.textFile("file.csv")
    rdd = rdd.mapPartitions(lambda x: csv.reader(x))
    EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you'll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (make sure not to modify header before the filter evaluates). But at this point, you're probably better off using a built-in csv parser.
      June 14, 2019 12:36 PM IST
    0
  • This is in PYSPARK

    path="Your file path with file name"
    
    df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

     

    Then you can check

    df.show(5)
    df.count()
      December 9, 2021 12:35 PM IST
    0
  • And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

    For example:

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    import pandas as pd
    
    sc = SparkContext('local','example')  # if using locally
    sql_sc = SQLContext(sc)
    
    pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
    # pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
    s_df = sql_sc.createDataFrame(pandas_df)
      January 10, 2022 12:26 PM IST
    0
  • from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
    
    print(df.collect())
      January 11, 2022 3:52 PM IST
    0