Encoder pour les ensembles de données Spark

je voudrais écrire un codeur Ligne tapez un ensemble de données, pour une opération cartographique que je fais. Essentiellement, je ne comprends pas comment écrire les encodeurs.

ci-Dessous est un exemple d'une opération de carte:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());

je comprends qu'au lieu d'une chaîne de Codeur doit être écrite de la façon suivante:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };

cependant, je ne comprends pas le clsTag () dans l'encodeur, et j'essaye de trouver un exemple d'exécution qui peut démostrer quelque chose de similaire (i.e. un encodeur pour un type de ligne)

Modifier - Ce n'est pas une copie de la question visée : erreur D'encodage en essayant de mapper la ligne dataframe à la ligne mise à jour comme la réponse parle de L'utilisation de Spark 1.X en étincelle 2.x (Je ne le fais pas), aussi je suis à la recherche d'un encodeur pour une classe de rang plutôt que de résoudre une erreur. Enfin, je cherchais une solution à Java, pas à Scala.

17
demandé sur Community 2017-04-05 21:13:56

2 réponses

La réponse est d'utiliser un RowEncoder et le schéma de l'ensemble de données en utilisant TypeStruct.

ci-Dessous est un exemple de travail d'un flatmap opération sur les ensembles de données:

    StructType structType = new StructType();
    structType = structType.add("id1", DataTypes.LongType, false);
    structType = structType.add("id2", DataTypes.LongType, false);

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(Row row) throws Exception {
            // a static map operation to demonstrate
            List<Object> data = new ArrayList<>();
            data.add(1l);
            data.add(2l);
            ArrayList<Row> list = new ArrayList<>();
            list.add(RowFactory.create(data.toArray()));
            return list.iterator();
        }
    }, encoder);
14
répondu tsar2512 2017-04-08 14:37:05

j'ai eu le même problème... Encoders.kryo(Row.class)) a fonctionné pour moi.

en bonus, Les Docs de tuning D'Apache Spark font référence à Kryo it car il est plus rapide à la sérialisation "souvent autant que 10x":

https://spark.apache.org/docs/latest/tuning.html

3
répondu Jim Bob 2018-01-26 16:41:26