Hoe kom ik erachter als een Vonk DataFrame is een kolom

Wanneer ik een DataFrame van een JSON-bestand in de Spark SQL, hoe kan ik zien of een bepaalde kolom bestaat voor het aanroepen van .select

Bijvoorbeeld JSON schema:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

Dit is wat ik wil doen:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

maar ik kan niet vinden van een goede functie voor hasColumn. De dichtstbijzijnde ik heb gekregen is om te testen of de kolom is in deze wat lastige matrix:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
InformationsquelleAutor ben | 2016-03-09



9 Replies
  1. 78

    Gewoon aannemen dat het bestaat en laten mislukken met Try. Duidelijk en eenvoudig en ondersteunt een willekeurige nesten:

    import scala.util.Try
    import org.apache.spark.sql.DataFrame
    
    def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
    
    val df = sqlContext.read.json(sc.parallelize(
      """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
    
    hasColumn(df, "foobar")
    //Boolean = false
    
    hasColumn(df, "foo")
    //Boolean = true
    
    hasColumn(df, "foo.bar")
    //Boolean = true
    
    hasColumn(df, "foo.bar.foobar")
    //Boolean = true
    
    hasColumn(df, "foo.bar.foobaz")
    //Boolean = false

    Of nog eenvoudiger:

    val columns = Seq(
      "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
    
    columns.flatMap(c => Try(df(c)).toOption)
    //Seq[org.apache.spark.sql.Column] = List(
    //  foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)

    Python-equivalent:

    from pyspark.sql.utils import AnalysisException
    from pyspark.sql import Row
    
    
    def has_column(df, col):
        try:
            df[col]
            return True
        except AnalysisException:
            return False
    
    df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
    
    has_column(df, "foobar")
    ## False
    
    has_column(df, "foo")
    ## True
    
    has_column(df, "foo.bar")
    ## True
    
    has_column(df, "foo.bar.foobar")
    ## True
    
    has_column(df, "foo.bar.foobaz")
    ## False
    • Dit werkt met een gestructureerd veld. De oplossingen die gebruikt contains functie niet! +1
    • Op het eerste gezicht de df(path) of df[col] eruit ziet zoals het zou een zeer dure test, maar dit is al een luie dag te bouwen, dus het is goedkoop, is dat correct?
    • Dat heeft weinig te maken met luiheid. Columns zijn geen gegevens containers, maar onderdelen van query beschrijving van het model. In bredere context heeft om te weten iets over Dataset u verwerken en controleren van de logische QueryExectution.analyzed plan, dat geldt voor col / apply (zowel resolve) of schema / columns gelijk.
    • Dank dat is precies wat ik bedoelde. Ik heb het niet over lazy richtlijnen in Scala, maar eerder de gefaseerde aanpak van hoe Spark maakt een logisch plan, transformeert tot een fysieke plan en voert de taken van het cluster. Als dit is opgelost op het logische plan fase dan is het goedkoper.
  2. 35

    Een andere optie die ik normaal gebruik is

    df.columns.contains("column-name-to-check")

    Dit geeft als resultaat een boolean

    • Ja dat klopt het niet werken met geneste kolommen.
  3. 13

    Eigenlijk hoef je niet eens nodig hebt om te bellen selecteren om het gebruik van kolommen, kunt u gewoon bellen op het dataframe zelf

    //define test data
    case class Test(a: Int, b: Int)
    val testList = List(Test(1,2), Test(3,4))
    val testDF = sqlContext.createDataFrame(testList)
    
    //define the hasColumn function
    def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
    
    //then you can just use it on the DF with a given column name
    hasColumn(testDF, "a")  //<-- true
    hasColumn(testDF, "c")  //<-- false

    U kunt een impliciete klasse met behulp van de pimp my library (mijn bibliotheek patroon, zodat de hasColumn methode is beschikbaar op uw dataframes direct

    implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
        def hasColumn(colName: String) = df.columns.contains(colName)
    }

    Daarna kunt u het gebruiken als:

    testDF.hasColumn("a") //<-- true
    testDF.hasColumn("c") //<-- false
    • Dit werkt niet met de geneste kolommen. van json {"a":{"b":1,"c":0}}
  4. 4

    Try is niet optimaal als het evalueert de expressie binnen Try voordat hij de beslissing neemt.

    Voor grote data sets, het gebruik van de hieronder in Scala:

    df.schema.fieldNames.contains("column_name")
  5. 3

    Uw andere optie zou zijn om te doen wat array manipulatie (in dit geval een intersect) op de df.columns en uw potential_columns.

    //Loading some data (so you can just copy & paste right into spark-shell)
    case class Document( a: String, b: String, c: String)
    val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
    
    //The columns we want to extract
    val potential_columns = Seq("b", "c", "d")
    
    //Get the intersect of the potential columns and the actual columns, 
    //we turn the array of strings into column objects
    //Finally turn the result into a vararg (: _*)
    df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show

    Helaas dit zal niet werken voor je innerlijke object scenario hierboven. Je zal moeten kijken naar het schema voor.

    Ik ben van plan om te veranderen van uw potential_columns volledig gekwalificeerde kolom namen

    val potential_columns = Seq("a.b", "a.c", "a.d")
    
    //Our object model
    case class Document( a: String, b: String, c: String)
    case class Document2( a: Document, b: String, c: String)
    
    //And some data...
    val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
    
    //We go through each of the fields in the schema.
    //For StructTypes we return an array of parentName.fieldName
    //For everything else we return an array containing just the field name
    //We then flatten the complete list of field names
    //Then we intersect that with our potential_columns leaving us just a list of column we want
    //we turn the array of strings into column objects
    //Finally turn the result into a vararg (: _*)
    df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

    Dit gaat slechts één niveau diep, dus om het generieke u zou hebben om meer werk te doen.

  6. 1

    Voor degenen die struikelen over deze op zoek naar een Python oplossing, die ik gebruik:

    if 'column_name_to_check' in df.columns:
        # do something

    Toen ik probeerde @Jai Prakash het antwoord van df.columns.contains('column-name-to-check') met behulp van Python, ik heb AttributeError: 'list' object has no attribute 'contains'.

  7. 0

    Als u versnipperen uw json met behulp van een schema definitie wanneer u laden dan hoef je ook niet te controleren voor de kolom. als het niet in de json-bron wordt weergegeven als een null-kolom.

            val schemaJson = """
      {
          "type": "struct",
          "fields": [
              {
                "name": field1
                "type": "string",
                "nullable": true,
                "metadata": {}
              },
              {
                "name": field2
                "type": "string",
                "nullable": true,
                "metadata": {}
              }
          ]
      }
            """
        val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
    
        val djson = sqlContext.read
        .schema(schema )
        .option("badRecordsPath", readExceptionPath)
        .json(dataPath)
  8. 0

    In PySpark, df.kolommen geeft u een lijst van kolommen in het dataframe, dus
    “colName” in df.kolommen
    zou return True of False. Probeer op. Goed geluk!

  9. -1
    def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
      Try(df.select(colName)).isSuccess

    Gebruik maken van de hierboven genoemde functie om te controleren of het bestaan van een kolom, inclusief geneste kolom naam.

Geef een reactie

Het e-mailadres wordt niet gepubliceerd. Vereiste velden zijn gemarkeerd met *