Hive cluster par vs ordre par vs Trier par

Pour autant que je comprends;

  • Trier par trie uniquement avec dans le réducteur

  • Order by commande les choses globalement mais pousse tout en un réducteurs

  • Cluster by distribue intelligemment les choses dans les réducteurs par le hachage de clé et fait un tri par

Ma question est donc de savoir si le cluster garantit un ordre global? distribute by met les mêmes clés dans les mêmes réducteurs mais qu'en est-il des clés adjacentes?

Le seul document I peut trouver sur ceci est ici et à partir de l'exemple, il semble qu'il les Commande globalement. Mais d'après la définition, j'ai l'impression qu'il ne le fait pas toujours.

46
demandé sur cashmere 2012-12-05 05:42:47

5 réponses

Une réponse plus courte: Oui, CLUSTER BY garantit une commande globale, à condition que vous soyez prêt à joindre les multiples fichiers de sortie vous-même.

La version plus longue:

  • ORDER BY x: garantit la commande globale, mais le fait en poussant toutes les données à travers un seul réducteur. C'est fondamentalement inacceptable pour les grands ensembles de données. Vous finissez par un fichier trié en sortie.
  • SORT BY x: commande des données à chacun des n réducteurs, mais chaque réducteur peut recevoir des plages de données qui se chevauchent. Vous vous retrouvez avec N ou plusieurs fichiers triés avec des plages qui se chevauchent.
  • DISTRIBUTE BY x: assure que chacun des n réducteurs obtient des plages de x qui ne se chevauchent pas, mais ne trie pas la sortie de chaque réducteur. Vous vous retrouvez avec N ou des fichiers non triés avec des plages qui ne se chevauchent pas.
  • CLUSTER BY x: assure que chacun des n réducteurs obtient des plages qui ne se chevauchent pas, puis trie par ces plages au niveau des réducteurs. Cela vous donne un ordre global, et c'est la même chose que doing (DISTRIBUTE BY x et SORT BY x). Vous vous retrouvez avec N ou plusieurs fichiers triés sans chevauchement gamme.

Sens? Donc CLUSTER BY est fondamentalement la version la plus évolutive de ORDER BY.

124
répondu Lars Yencken 2013-09-17 01:09:05

Permettez-moi de clarifier d'abord: clustered by distribue uniquement vos clés dans différents seaux, clustered by ... sorted by obtenir des seaux triés.

Avec une expérience simple (voir ci-dessous), vous pouvez voir que vous n'obtiendrez pas l'ordre mondial par défaut. La raison en est que le partitionneur par défaut divise les clés en utilisant des codes de hachage indépendamment de l'ordre réel des clés.

Cependant, vous pouvez obtenir vos données totalement commandées.

La Motivation est "Hadoop: le guide définitif" par Tom White (3ème édition, Chapitre 8, p. 274, total Sort), où il discute TotalOrderPartitioner.

Je vais d'abord répondre à votre question D'ordre total, puis décrire plusieurs expériences de ruche liées au tri que j'ai faites.

Gardez à l'esprit: ce que je décris ici est une 'preuve de concept', j'ai pu gérer un seul exemple en utilisant la distribution Cdh3 de Claudera.

À l'origine, j'espérais que org.Apache.hadoop.mapred.lib.TotalOrderPartitioner fera l'affaire. Malheureusement ce n'est pas le cas car il ressemble à des partitions de ruche par valeur, pas de clé. Donc, je le patche (devrait avoir une sous-classe, mais je n'ai pas le temps pour cela):

Remplacer

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

Avec

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

Maintenant, vous pouvez définir (patché) TotalOrderPartitioner comme votre partitionneur de Ruche:

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

, j'ai aussi utilisé

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

Dans mes tests.

File out_data2 indique à TotalOrderPartitioner comment stocker les valeurs. Vous générez out_data2 en échantillonnant vos données. Dans mes tests, j'ai utilisé des seaux 4 et des clés de 0 à 10. J'ai généré out_data2 en utilisant ad-hoc approche:

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

Ensuite, j'ai copié out_data2 résultant en HDFS (dans / user / yevgen / out_data2)

Avec ces paramètres, mes données ont été bucketed / triées(voir le dernier élément de ma liste d'expériences).

Voici mes expériences.

  • Créer des exemples de données

    Bash > echo-E "1\n3\n2\n4\n5\n7\n6\n8\N9\n0" > données.txt

  • Créer une table de test de base:

    Hive > créer un test de table (X int); hive > charger les données inpath locales 'données.txt " dans le tableau essai;

Fondamentalement, cette table contient des valeurs de 0 à 9 sans ordre.

  • Démontrer comment fonctionne la copie de table (vraiment mapred.réduire.tasks paramètre qui définit le nombre MAXIMAL de tâches de réduction à utiliser)

    Hive > créer la table test2(x int);

    Ruche > définir mapred.réduire.tâches=4;

    Ruche> insérer, remplacer le tableau test2 sélectionnez A. x dans le test a joindre le test b sur un.x=b.x; -- stupied se joindre à la force non négligeable réduire la carte

    Bash > hadoop fs -chat /utilisateur/ruche/entrepôt/test2/000001_0

    1

    5

    9

  • Démontrer seaux. Vous pouvez voir que les clés sont assingées au hasard sans aucun ordre de tri:

    Ruche > créer une table test3(x int) clustered par (x) dans 4 seaux;

    Ruche > définir la ruche.appliquer.écopage = true;

    Ruche> insérer, remplacer le tableau test3 sélectionnez * à partir du test;

    Bash> hadoop fs -chat / utilisateur / ruche/entrepôt / test3 / 000000_0

    4

    8

    0

  • Seaux avec tri. Les résultats sont partiellement triés, pas totalement triés

    Ruche > créer une table test4(x int) cluster par (x) trié par (X desc) dans 4 seaux;

    Ruche> insérer, remplacer le tableau test4 sélectionnez * à partir du test;

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test4/000001_0

    1

    5

    9

Vous pouvez voir que les valeurs sont triés dans l'ordre croissant. On dirait un bug de ruche dans CDH3?

  • Obtenir partiellement trié sans cluster par instruction:

    Hive > créer la table test5 comme sélectionnez x de test distribuer par x Trier par X desc;

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test5/000001_0

    9

    5

    1

  • Utilisez mon TotalOrderParitioner patché:

    Ruche > ensemble ruche.mapred.outil de partitionnement=org.Apache.hadoop.mapred.lib.TotalOrderPartitioner;

    Ruche > ensemble total.ordre.outil de partitionnement.naturel.ordre = false

    Ruche > ensemble total.ordre.outil de partitionnement.chemin= / utilisateur / formation / out_data2

    Hive > créer une table test6(x int) regroupé par (x) trié par (x) en 4 compartiments;

    Ruche> insérer, remplacer le tableau test6 sélectionnez * à partir du test;

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test6/000000_0

    1

    2

    0

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test6/000001_0

    3

    4

    5

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test6/000002_0

    7

    6

    8

    Bash> hadoop fs -chat /utilisateur/ruche/entrepôt/test6/000003_0

    9

13
répondu Yevgen Yampolskiy 2012-12-05 09:32:51

Si je comprends bien, la réponse courte est non. Vous obtiendrez des plages qui se chevauchent.

À partir de la documentation SortBy : "Cluster By est un raccourci pour distribuer par et Trier par." "Toutes les lignes avec la même distribution par colonnes iront au même réducteur." Mais il n'y a pas d'informations qui distribuent par garantie non-chevauchement des gammes.

De plus, à partir de documentation DDL BucketedTables : "Comment Hive distribue-t-elle les rangées à travers les seaux? En général, le seau nombre est déterminé par l'expression hash_function(bucketing_column) mod num_buckets." Je suppose que L'instruction Cluster by in Select utilise le même principe pour distribuer les lignes entre les réducteurs car son utilisation principale est de remplir des tables bucketed avec les données.

J'ai créé une table avec 1 colonne entière "a", et y ai inséré des nombres de 0 à 9.

Ensuite, je mets le nombre de réducteurs à 2 set mapred.reduce.tasks = 2;.

Et select les données de cette table avec Cluster by clause select * from my_tab cluster by a;

Et résultat reçu que j'attendais:

0
2
4
6
8
1
3
5
7
9

Donc, le premier réducteur (nombre 0) a des nombres pairs (parce que leur mode 2 donne 0)

Et le deuxième réducteur (numéro 1) a des nombres impairs (parce que leur mode 2 donne 1)

Voilà comment fonctionne "Distribute By".

Puis "Trier par" trie les résultats à l'intérieur de chaque réducteur.

4
répondu Anton Zaviriukhin 2018-09-19 23:50:40

CLUSTER BY ne produit pas de commande globale.

La réponse acceptée (par Lars Yencken) induit en erreur en indiquant que les réducteurs recevront des plages non chevauchantes. Comme Anton Zaviriukhin pointe correctement vers la documentation BucketedTables, CLUSTER BY est essentiellement distribué par (identique à bucketing) plus Trier par dans chaque seau/réducteur. Et distribuer simplement des hachages et des mods dans des seaux et tandis que la fonction de hachage Peut préserver l'ordre (hachage de i > hachage de J SI i > j), mod de la valeur de hachage ne fonctionne pas.

Voici un meilleur exemple montrant des plages qui se chevauchent

Http://myitlearnings.com/bucketing-in-hive/

3
répondu Edi Bice 2016-08-04 15:59:38

Cluster by EST PAR TRI réducteur Non global. Dans de nombreux livres, il est également mentionné de manière incorrecte ou confuse. Il a une utilisation particulière où disons que vous distribuez chaque département à un réducteur spécifique, puis Triez par nom d'employé dans chaque département et ne vous souciez pas de l'ordre du département non le cluster à utiliser et plus performant que la charge de travail est répartie entre les réducteurs.

0
répondu user3423890 2017-04-19 14:25:56