QBoard » Big Data » Big Data - Spark » Spark - load CSV file as DataFrame?

Spark - load CSV file as DataFrame?

  • I would like to read a CSV in spark and convert it as DataFrame and store it in HDFS with df.registerTempTable("table_name")

    I have tried:

    scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
    ​

    Error which I got:

    java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
        at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
        at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
        at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


    What is the right command to load CSV file as DataFrame in Apache Spark?

     
      November 13, 2021 2:41 PM IST
    0
  • In Java 1.8 This code snippet perfectly working to read CSV files

    POM.xml

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.0.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>


    Java

    SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    // create Spark Context
    SparkContext context = new SparkContext(conf);
    // create spark Session
    SparkSession sparkSession = new SparkSession(context);
    
    Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    
            //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    System.out.println("========== Print Schema ============");
    df.printSchema();
    System.out.println("========== Print Data ==============");
    df.show();
    System.out.println("========== Print title ==============");
    df.select("title").show();
      December 8, 2021 10:16 AM IST
    0
  • There are a lot of challenges to parsing a CSV file, it keeps adding up if the file size is bigger, if there are non-english/escape/separator/other characters in the column values, that could cause parsing errors.

    The magic then is in the options that are used. The ones that worked for me and hope should cover most of the edge cases are in code below:

    ### Create a Spark Session
    spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
    
    ### Note the options that are used. You may have to tweak these in case of error
    html_df = spark.read.csv(html_csv_file_path, 
                             header=True, 
                             multiLine=True, 
                             ignoreLeadingWhiteSpace=True, 
                             ignoreTrailingWhiteSpace=True, 
                             encoding="UTF-8",
                             sep=',',
                             quote='"', 
                             escape='"',
                             maxColumns=2,
                             inferSchema=True)

     

    Hope that helps. For more refer: Using PySpark 2 to read CSV having HTML source code

    Note: The code above is from Spark 2 API, where the CSV file reading API comes bundled with built-in packages of Spark installable.

    Note: PySpark is a Python wrapper for Spark and shares the same API as Scala/Java.

      November 15, 2021 12:30 PM IST
    0
  • With Spark 2.0, following is how you can read CSV

    val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder
      .config(conf = conf)
      .appName("spark session example")
      .getOrCreate()
    
    val path = "/Users/xxx/Downloads/usermsg.csv"
    val base_df = sparkSession.read.option("header","true").
      csv(path)
      November 17, 2021 12:50 PM IST
    0