• 3
name

A PHP Error was encountered

Severity: Notice

Message: Undefined index: userid

Filename: views/question.php

Line Number: 191

Backtrace:

File: /home/prodcxja/public_html/questions/application/views/question.php
Line: 191
Function: _error_handler

File: /home/prodcxja/public_html/questions/application/controllers/Questions.php
Line: 433
Function: view

File: /home/prodcxja/public_html/questions/index.php
Line: 315
Function: require_once

name Punditsdkoslkdosdkoskdo

Removing duplicate columns after a DF join in Spark

When you join two DFs with similar column names:

df = df1.join(df2, df1['id'] == df2['id'])

Join works fine but you can't call the id column because it is ambiguous and you would get the following exception:

pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"

This makes id not usable anymore...

The following function solves the problem:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

What I don't like about it is that I have to iterate over the column names and delete them why by one. This looks really clunky...

Do you know of any other solution that will either join and remove duplicates more elegantly or delete multiple columns without iterating over each of them?

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+
  • 31
Reply Report

df.join(other, on, how) when on is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when on is a join expression, it will result in duplicate columns. We can use .drop(df.a) to drop duplicate columns. Example:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)
  • 12
Reply Report
      • 1
    • That's… unintuitive (different behavior depending on form of on). But great to know -- thanks.

Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

I use the following two methods to remove duplicates:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)
  • 5
Reply Report

The code below works with Spark 1.6.0 and above.

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join will prevent the duplication of the shared column.

Let's assume that you want to remove the column Num in this example, you can just use .drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+
  • 2
Reply Report

After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.

Where Names is a table with columns ['Id', 'Name', 'DateId', 'Description'] and Dates is a table with columns ['Id', 'Date', 'Description'], the columns Id and Description will be duplicated after being joined.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where dropDupeDfCols is defined as:

def dropDupeDfCols(df):
    newcols = []
    dupcols = []

    for i in range(len(df.columns)):
        if df.columns[i] not in newcols:
            newcols.append(df.columns[i])
        else:
            dupcols.append(i)

    df = df.toDF(*[str(i) for i in range(len(df.columns))])
    for dupcol in dupcols:
        df = df.drop(str(dupcol))

    return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date'].

  • 1
Reply Report

In my case I had a dataframe with multiple duplicate columns after joins and I was trying to same that dataframe in csv format, but due to duplicate column I was getting error. I followed below steps to drop duplicate columns. Code is in scala

1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))

if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)
  • 0
Reply Report

Trending Tags