Apache Pig: exécution parallèle et aplatie de réducteurs

j'ai implémenté un script de cochon Apache. Quand j'exécute le script, il en résulte de nombreux mappers pour une étape spécifique, mais n'a qu'un réducteur pour cette étape. En raison de cette condition (beaucoup de mappers, un réducteur) le cluster Hadoop est presque inactif tandis que le réducteur simple exécute. Afin de mieux utiliser les ressources du cluster, je voudrais également avoir de nombreux réducteurs fonctionnant en parallèle.

même si je mets le parallélisme dans le script Pig en utilisant le paramètre DEFAULT_PARALLEL commande j'ai toujours raison dans le fait d'avoir seulement 1 réducteur.

La partie de code de délivrer le problème est le suivant:

SET DEFAULT_PARALLEL 5;
inputData = LOAD 'input_data.txt' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);

les alias "inputData" et "inputdatagroupé" sont calculés dans le mapper.

les "paires" et les "paires" du réducteur.

si je change le script en supprimant la ligne avec la commande FLATTEN (pairsFlat = FOREACH pairs GENERATE FLATTEN (pairs_bag) AS (item1:int, item2:int);) alors l'exécution donne 5 réducteurs (et ainsi, dans une exécution parallèle).

il semble que la commande FLATTEN soit le problème et évite que beaucoup de réducteurs soient créés.

comment atteindre le même résultat de FLATTEN mais avoir le script exécuté en parallèle (avec de nombreux réducteurs)?

Edit:

expliquer le plan lorsque vous avez deux FOREACH (comme ci-dessus):

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
|   |
|   Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][0] - scope-23
    |   |
    |   Cast[int] - scope-27
    |   |
    |   |---Project[bytearray][1] - scope-26
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------


Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
    |   |
    |   Project[bag][0] - scope-39
    |
    |---pairs: New For Each(false)[bag] - scope-38
        |   |
        |   POUserFunc(GeneratePairsUDF)[bag] - scope-36
        |   |
        |   |---Project[bag][1] - scope-35
        |       |
        |       |---Project[bag][1] - scope-34
        |
        |---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false

expliquer le plan lorsqu'il n'y a QU'un seul avant avec un emballage aplati de L'UDF:

Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
|   |
|   Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
    |   |
    |   Cast[chararray] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[int] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |
    |---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------


Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
    |   |
    |   POUserFunc(GeneratePairsUDF)[bag] - scope-33
    |   |
    |   |---Project[bag][1] - scope-32
    |       |
    |       |---Project[bag][1] - scope-31
    |
    |---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
35
demandé sur ROMANIA_engineer 2013-11-07 16:00:23

4 réponses

Il n'y a pas de garantie si pig utilise la valeur DEFAULT_PARALLEL pour chaque étape dans le script pig. Essayez le parallèle avec votre join / group step spécifique que vous sentez prendre le temps (dans votre groupe de cas step).

 inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;

si cela ne fonctionne toujours pas alors vous pourriez avoir à voir vos données pour problème d'asymétrie.

3
répondu Ashish 2014-06-17 11:12:35

je pense qu'il y a une asymétrie dans les données. Seul un petit nombre de mappeurs produisent une production exponentiellement importante. Regardez la distribution des clés dans vos données. Comme les données contiennent peu de groupes avec un grand nombre d'enregistrements.

1
répondu Tanveer 2014-06-17 07:28:45

j'ai essayé "set default parallel "et" PARALLEL 100 " mais pas de chance. Cochon utilise encore 1 réducteur.

il s'est avéré que je dois générer un nombre aléatoire de 1 à 100 pour chaque enregistrement et regrouper ces enregistrements par ce nombre aléatoire.

nous perdons du temps sur le regroupement, mais il est beaucoup plus rapide pour moi parce que maintenant je peux utiliser plus de réducteurs.

voici le code (SUBMITTER est mon propre UDF):

tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));
1
répondu user3110379 2014-06-19 23:17:51

pour répondre à votre question, nous devons d'abord savoir combien de réducteurs pig applique pour accomplir le - processus de réorganisation globale. Parce que selon ma compréhension la génération / Projection ne devrait pas exiger un réducteur simple. Je ne peux pas en dire autant de Flatten. Toutefois, nous savons de bon sens que pendant flatten le but est de démêler les tuples des sacs et vice versa. Et pour ce faire tous les tuples appartenant à un sac devrait certainement être disponible dans le même réducteur. J'ai peut-être mauvais. Mais quelqu'un peut-il ajouter quelque chose ici pour obtenir une réponse à cet utilisateur s'il vous plaît ?

0
répondu nitinr708 2016-10-08 01:15:11