QBoard » Big Data » Big Data - Spark » How to overwrite the output directory in spark

How to overwrite the output directory in spark

  • I have a spark streaming application which produces a dataset for every minute. I need to save/overwrite the results of the processed data.

    When I tried to overwrite the dataset org.apache.hadoop.mapred.FileAlreadyExistsException stops the execution.

    I set the Spark property set("spark.files.overwrite","true") , but there is no luck.

    How to overwrite or Predelete the files from spark?

      November 22, 2021 12:28 PM IST
    0
  • The documentation for the parameter spark.files.overwrite says this: "Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source." So it has no effect on saveAsTextFiles method.

    You could do this before saving the file:

    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
    try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }


    Aas explained here: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html

      November 27, 2021 10:28 AM IST
    0
  • This overloaded version of the save function works for me:

    yourDF.save(outputPath, org.apache.spark.sql.SaveMode.valueOf("Overwrite"))

    The example above would overwrite an existing folder. The savemode can take these parameters as well (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):

    Append: Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

    ErrorIfExists: ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

    Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

      November 29, 2021 11:45 AM IST
    0
  • If you are willing to use your own custom output format, you would be able to get the desired behaviour with RDD as well.

    Have a look at the following classes: FileOutputFormatFileOutputCommitter

    In file output format you have a method named checkOutputSpecs, which is checking whether the output directory exists. In FileOutputCommitter you have the commitJob which is usually transferring data from the temporary directory to its final place.

    I wasn't able to verify it yet (would do it, as soon as I have few free minutes) but theoretically: If I extend FileOutputFormat and override checkOutputSpecs to a method that doesn't throw exception on directory already exists, and adjust the commitJob method of my custom output committer to perform which ever logic that I want (e.g. Override some of the files, append others) than I may be able to achieve the desired behaviour with RDDs as well.

    The output format is passed to: saveAsNewAPIHadoopFile (which is the method saveAsTextFile called as well to actually save the files). And the Output committer is configured at the application level.

      December 8, 2021 10:14 AM IST
    0