Encoder pour les ensembles de données Spark
je voudrais écrire un codeur Ligne tapez un ensemble de données, pour une opération cartographique que je fais. Essentiellement, je ne comprends pas comment écrire les encodeurs.
ci-Dessous est un exemple d'une opération de carte:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
ArrayList<String> obj = //some map operation
return obj.iterator();
}
},Encoders.STRING());
je comprends qu'au lieu d'une chaîne de Codeur doit être écrite de la façon suivante:
Encoder<Row> encoder = new Encoder<Row>() {
@Override
public StructType schema() {
return join.schema();
//return null;
}
@Override
public ClassTag<Row> clsTag() {
return null;
}
};
cependant, je ne comprends pas le clsTag () dans l'encodeur, et j'essaye de trouver un exemple d'exécution qui peut démostrer quelque chose de similaire (i.e. un encodeur pour un type de ligne)
Modifier - Ce n'est pas une copie de la question visée : erreur D'encodage en essayant de mapper la ligne dataframe à la ligne mise à jour comme la réponse parle de L'utilisation de Spark 1.X en étincelle 2.x (Je ne le fais pas), aussi je suis à la recherche d'un encodeur pour une classe de rang plutôt que de résoudre une erreur. Enfin, je cherchais une solution à Java, pas à Scala.
2 réponses
La réponse est d'utiliser un RowEncoder et le schéma de l'ensemble de données en utilisant TypeStruct.
ci-Dessous est un exemple de travail d'un flatmap opération sur les ensembles de données:
StructType structType = new StructType();
structType = structType.add("id1", DataTypes.LongType, false);
structType = structType.add("id2", DataTypes.LongType, false);
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
// a static map operation to demonstrate
List<Object> data = new ArrayList<>();
data.add(1l);
data.add(2l);
ArrayList<Row> list = new ArrayList<>();
list.add(RowFactory.create(data.toArray()));
return list.iterator();
}
}, encoder);
j'ai eu le même problème... Encoders.kryo(Row.class))
a fonctionné pour moi.
en bonus, Les Docs de tuning D'Apache Spark font référence à Kryo it car il est plus rapide à la sérialisation "souvent autant que 10x":