QBoard » Big Data » Big Data - Spark » How to perform union on two DataFrames with different amounts of columns in spark?

How to perform union on two DataFrames with different amounts of columns in spark?

  • I have 2 DataFrames:


    Source data

    I need union like this:

    enter image description here

    The unionAll function doesn't work because the number and the name of columns are different.

    How can I do this?

      September 15, 2021 11:31 PM IST
    0
  • A very simple way to do this - select the columns in the same order from both the dataframes and use unionAll

    df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
       .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))​
      October 9, 2021 1:13 PM IST
    0
  • Here is the code for Python 3.0 using pyspark:

    from pyspark.sql.functions import lit
    
    
    def __order_df_and_add_missing_cols(df, columns_order_list, df_missing_fields):
        """ return ordered dataFrame by the columns order list with null in missing columns """
        if not df_missing_fields:  # no missing fields for the df
            return df.select(columns_order_list)
        else:
            columns = []
            for colName in columns_order_list:
                if colName not in df_missing_fields:
                    columns.append(colName)
                else:
                    columns.append(lit(None).alias(colName))
            return df.select(columns)
    
    
    def __add_missing_columns(df, missing_column_names):
        """ Add missing columns as null in the end of the columns list """
        list_missing_columns = []
        for col in missing_column_names:
            list_missing_columns.append(lit(None).alias(col))
    
        return df.select(df.schema.names + list_missing_columns)
    
    
    def __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols):
        """ return union of data frames with ordered columns by left_df. """
        left_df_all_cols = __add_missing_columns(left_df, left_list_miss_cols)
        right_df_all_cols = __order_df_and_add_missing_cols(right_df, left_df_all_cols.schema.names,
                                                            right_list_miss_cols)
        return left_df_all_cols.union(right_df_all_cols)
    
    
    def union_d_fs(left_df, right_df):
        """ Union between two dataFrames, if there is a gap of column fields,
         it will append all missing columns as nulls """
        # Check for None input
        if left_df is None:
            raise ValueError('left_df parameter should not be None')
        if right_df is None:
            raise ValueError('right_df parameter should not be None')
            # For data frames with equal columns and order- regular union
        if left_df.schema.names == right_df.schema.names:
            return left_df.union(right_df)
        else:  # Different columns
            # Save dataFrame columns name list as set
            left_df_col_list = set(left_df.schema.names)
            right_df_col_list = set(right_df.schema.names)
            # Diff columns between left_df and right_df
            right_list_miss_cols = list(left_df_col_list - right_df_col_list)
            left_list_miss_cols = list(right_df_col_list - left_df_col_list)
            return __order_and_union_d_fs(left_df, right_df, left_list_miss_cols, right_list_miss_cols)
      September 23, 2021 1:42 PM IST
    0
  • Here is my Python version:

    from pyspark.sql import SparkSession, HiveContext
    from pyspark.sql.functions import lit
    from pyspark.sql import Row
    
    def customUnion(df1, df2):
        cols1 = df1.columns
        cols2 = df2.columns
        total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
        def expr(mycols, allcols):
            def processCols(colname):
                if colname in mycols:
                    return colname
                else:
                    return lit(None).alias(colname)
            cols = map(processCols, allcols)
            return list(cols)
        appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
        return appended

     

    Here is sample usage:

    data = [
        Row(zip_code=58542, dma='MIN'),
        Row(zip_code=58701, dma='MIN'),
        Row(zip_code=57632, dma='MIN'),
        Row(zip_code=58734, dma='MIN')
    ]
    
    firstDF = spark.createDataFrame(data)
    
    data = [
        Row(zip_code='534', name='MIN'),
        Row(zip_code='353', name='MIN'),
        Row(zip_code='134', name='MIN'),
        Row(zip_code='245', name='MIN')
    ]
    
    secondDF = spark.createDataFrame(data)
    
    customUnion(firstDF,secondDF).show()
      September 30, 2021 12:34 PM IST
    0
  • Here's a pyspark solution.

    It assumes that if a field in df1 is missing from df2, then you add that missing field to df2 with null values. However it also assumes that if the field exists in both dataframes, but the type or nullability of the field is different, then the two dataframes conflict and cannot be combined. In that case I raise a TypeError.

    from pyspark.sql.functions import lit
    
    def harmonize_schemas_and_combine(df_left, df_right):
        left_types = {f.name: f.dataType for f in df_left.schema}
        right_types = {f.name: f.dataType for f in df_right.schema}
        left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
        right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
    
        # First go over left-unique fields
        for l_name, l_type, l_nullable in left_fields.difference(right_fields):
            if l_name in right_types:
                r_type = right_types[l_name]
                if l_type != r_type:
                    raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
                else:
                    raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
            df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
    
        # Now go over right-unique fields
        for r_name, r_type, r_nullable in right_fields.difference(left_fields):
            if r_name in left_types:
                l_type = left_types[r_name]
                if r_type != l_type:
                    raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
                else:
                    raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
            df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    
    
        # Make sure columns are in the same order
        df_left = df_left.select(df_right.columns)
    
        return df_left.union(df_right)

     

      January 15, 2022 12:52 PM IST
    0
  • Spark 3.1+

    df = df1.unionByName(df2, allowMissingColumns=True)
    

     

    Test results:

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    data1=[
    (1 , '2016-08-29', 1 , 2, 3),
    (2 , '2016-08-29', 1 , 2, 3),
    (3 , '2016-08-29', 1 , 2, 3)]
    df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
    data2=[
    (5 , '2016-08-29', 1, 2, 3, 4),
    (6 , '2016-08-29', 1, 2, 3, 4),
    (7 , '2016-08-29', 1, 2, 3, 4)]
    df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])
    
    df = df1.unionByName(df2, allowMissingColumns=True)
    df.show()
    
    #     +----+----------+----+---+---+----+----+
    #     |code|      date|   A|  B|  C|   D|   E|
    #     +----+----------+----+---+---+----+----+
    #     |   1|2016-08-29|   1|  2|  3|null|null|
    #     |   2|2016-08-29|   1|  2|  3|null|null|
    #     |   3|2016-08-29|   1|  2|  3|null|null|
    #     |   5|2016-08-29|null|  1|  2|   3|   4|
    #     |   6|2016-08-29|null|  1|  2|   3|   4|
    #     |   7|2016-08-29|null|  1|  2|   3|   4|
    #     +----+----------+----+---+---+----+----+

     

      January 17, 2022 1:46 PM IST
    0