Meilleure façon de convertir un champ string en timestamp en Spark

j'ai un CSV dans lequel un champ est datetime dans un format spécifique. Je ne peux pas l'importer directement dans mon Dataframe car il doit être un timestamp. Donc je l'importe comme chaîne de caractères et le transforme en Timestamp comme ceci

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}

y a-t-il une meilleure façon, plus concise de le faire, avec l'API Dataframe ou spark-sql? La méthode ci-dessus nécessite la création d'un RDD et de donner à nouveau le schéma pour le Dataframe.

18
demandé sur Rodrigue 2015-04-24 12:45:27

6 réponses

Spark >= 2.2

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+

Spark >= 1,6, < 2,2

vous pouvez utiliser des fonctions de traitement de date qui ont été introduites dans Spark 1.5. En supposant que vous avez les données suivantes:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")

Vous pouvez utiliser unix_timestamp analyser les chaînes, et de le jeter à timestamp

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+

comme vous pouvez le voir, il couvre à la fois l'analyse et la gestion des erreurs.

Spark >= 1,5, < 1,6

Vous aurez utiliser quelque chose comme ceci:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")

ou

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")

grâce à SPARK-11724.

étincelle < 1,5

vous devriez pouvoir les utiliser avec expr et HiveContext.

40
répondu zero323 2018-07-16 17:35:15

Je n'ai pas encore joué avec Spark SQL mais je pense que ce serait plus idiomatique scala (null usage n'est pas considéré comme une bonne pratique):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

remarquez s'il vous Plaît je suppose que vous savez Row types d'éléments à l'avance (si vous le lisez à partir d'un fichier csv, Tous sont String), c'est pourquoi j'ai utiliser un type comme String et non Any (tout est sous-type de Any).

cela dépend aussi de la façon dont vous voulez traiter les exceptions d'analyse. Dans ce cas, si une analyse l'exception se produit, a None est simplement retourné.

Vous pouvez l'utiliser plus loin avec:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
6
répondu jarandaf 2015-04-24 10:37:57

J'ai ISO8601 timestamp dans mon ensemble de données et j'ai dû le convertir au format "AAAA-MM-JJ". C'est ce que j'ai fait:

import org.joda.time.{DateTime, DateTimeZone}
object DateUtils extends Serializable {
  def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC)
  def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC)
}

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd"))

Et vous pouvez simplement utiliser l'UDF dans votre étincelle de requête SQL.

1
répondu zengr 2015-11-06 07:50:28

je voudrais déplacer le getTimeStamp méthode écrite par vous dans les mapPartitions et réutiliser GenericMutableRow de rdd parmi les lignes dans un iterator:

val strRdd = sc.textFile("hdfs://path/to/cvs-file")
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter =>
  new Iterator[Row] {
    val row = new GenericMutableRow(4)
    var current: Array[String] = _

    def hasNext = iter.hasNext
    def next() = {
      current = iter.next()
      row(0) = current(0)
      row(1) = current(1)
      row(2) = current(2)

      val ts = getTimestamp(current(3))
      if(ts != null) {
        row.update(3, ts)
      } else {
        row.setNullAt(3)
      }
      row
    }
  }
}

Et vous devriez toujours utiliser le schéma pour générer un DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema)

l'utilisation de GenericMutableRow à l'intérieur d'une implémentation iterator pourrait être trouvée dans Agrégation De L'Opérateur, InMemoryColumnarTableScan,opérations assimilables etc.

0
répondu Yijie Shen 2015-04-25 10:51:05

je voudrais utiliser https://github.com/databricks/spark-csv

ceci inférera des horodateurs pour vous.

import com.databricks.spark.csv._
val rdd: RDD[String] = sc.textFile("csvfile.csv")

val df : DataFrame = new CsvParser().withDelimiter('|')
      .withInferSchema(true)
      .withParseMode("DROPMALFORMED")
      .csvRdd(sqlContext, rdd)
0
répondu mark 2016-07-25 22:45:41

j'ai eu quelques problèmes avec to_timestamp où il retournait une chaîne vide. Après beaucoup d'essais et d'erreurs, j'ai été capable de le contourner en le moulant comme horodatage, puis en le moulant en chaîne. J'espère que cela aide quelqu'un d'autre avec le même problème:

df.columns.intersect(cols).foldLeft(df)((newDf, col) => {
  val conversionFunc = to_timestamp(newDf(col).cast("timestamp"), "MM/dd/yyyy HH:mm:ss").cast("string")
  newDf.withColumn(col, conversionFunc)
})
0
répondu ashwin319 2018-09-19 15:21:28