Enchaîner plusieurs emplois MapReduce dans Hadoop

Dans de nombreuses situations réelles où vous appliquez MapReduce, les algorithmes finaux finissent par être plusieurs étapes MapReduce.

C'est-à-dire Map1, Reduce1, Map2, Reduce2, et ainsi de suite.

Vous avez donc la sortie de la dernière réduction nécessaire comme entrée pour la carte suivante.

Les données intermédiaires sont quelque chose que vous (en général) ne voulez pas conserver une fois le pipeline terminé avec succès. Aussi parce que ces données intermédiaires sont en général une structure de données (comme un "carte" ou un "set"), vous ne voulez pas mettre trop d'effort dans l'écriture et la lecture de ces paires clé-valeur.

Quelle est la façon recommandée de le faire dans Hadoop?

Existe-t-il un exemple (simple) qui montre comment gérer correctement ces données intermédiaires, y compris le nettoyage par la suite?

116
demandé sur Ani Menon 2010-03-23 14:55:14

13 réponses

Je pense que ce tutoriel sur le réseau de développeurs de Yahoo vous aidera avec ceci: chaining Jobs

Vous utilisez le JobClient.runJob(). Le chemin de sortie des données du premier travail devient le chemin d'entrée de votre deuxième travail. Ces doivent être passés comme arguments à vos travaux avec le code approprié pour les analyser et définir les paramètres pour le travail.

Je pense que la méthode ci-dessus pourrait cependant être la façon dont l'API mapred maintenant plus ancienne l'a fait, mais elle devrait toujours fonctionner. Il y aura un méthode similaire dans la nouvelle API mapreduce mais je ne suis pas sûr de ce que c'est.

En ce qui concerne la suppression de données intermédiaires après la fin d'un travail, vous pouvez le faire dans votre code. La façon dont je l'ai fait avant utilise quelque chose comme:

FileSystem.delete(Path f, boolean recursive);

Où le chemin est l'emplacement sur HDFS des données. Vous devez vous assurer que vous ne supprimez ces données qu'une fois qu'aucun autre travail ne l'exige.

53
répondu Binary Nerd 2016-02-07 04:54:30

Il y a plusieurs façons de le faire.

(1) emplois en cascade

Créez l'objet JobConf "job1" pour le premier travail et définissez tous les paramètres avec "input" comme inputdirectory et "temp" comme répertoire de sortie. Exécuter ce travail:

JobClient.run(job1).

Juste en dessous, créez L'objet JobConf " job2 "pour le second travail et définissez tous les paramètres avec" temp "comme inputdirectory et" output " comme répertoire de sortie. Exécuter ce travail:

JobClient.run(job2).

(2) Créer deux JobConf objets et définir tous les paramètres comme (1) sauf que vous n'utilisez pas JobClient.exécuter.

Créez ensuite deux objets Job avec jobconfs comme paramètres:

Job job1=new Job(jobconf1); 
Job job2=new Job(jobconf2);

À l'aide de l'objet jobControl, vous spécifiez les dépendances de travail, puis exécutez les tâches:

JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();

(3) Si vous avez besoin d'une structure un peu comme Map + / Reduce / Map*, vous pouvez utiliser les classes ChainMapper et ChainReducer fournies avec Hadoop version 0.19 et partir de.

Santé

17
répondu user381928 2015-11-10 08:31:21

Il y a en fait un certain nombre de façons de le faire. Je vais me concentrer sur deux.

L'un est via Riffle ( http://github.com/cwensel/riffle ) une bibliothèque d'annotations pour identifier les choses dépendantes et les 'exécuter' dans l'ordre des dépendances (topologiques).

Ou vous pouvez utiliser une Cascade (et MapReduceFlow) en Cascade ( http://www.cascading.org/ ). Une future version prendra en charge les annotations Riffle, mais cela fonctionne très bien maintenant avec les travaux raw MR JobConf.

Une variante à ce sujet est de ne pas gérer les emplois MR à la main du tout, mais de développer votre application en utilisant L'API en cascade. Ensuite, le jobconf et le chaînage des tâches sont gérés en interne via le planificateur en cascade et les classes de flux.

De cette façon, vous passez votre temps à vous concentrer sur votre problème, pas sur les mécanismes de gestion des tâches Hadoop, etc. Vous pouvez même superposer différentes langues (comme clojure ou jruby) pour simplifier encore plus votre développement et vos applications. http://www.cascading.org/modules.html

7
répondu cwensel 2010-03-26 19:25:42

J'ai fait un chaînage de travail en utilisant les objets jobconf les uns après les autres. J'ai pris L'exemple WordCount pour enchaîner les travaux. Un travail figure combien de fois un mot a répété dans la sortie donnée. Le deuxième travail prend la première sortie de travail en entrée et calcule le total des mots dans l'entrée donnée. Voici le code qui doit être placé dans la classe Driver.

    //First Job - Counts, how many times a word encountered in a given file 
    JobConf job1 = new JobConf(WordCount.class);
    job1.setJobName("WordCount");

    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    job1.setMapperClass(WordCountMapper.class);
    job1.setCombinerClass(WordCountReducer.class);
    job1.setReducerClass(WordCountReducer.class);

    job1.setInputFormat(TextInputFormat.class);
    job1.setOutputFormat(TextOutputFormat.class);

    //Ensure that a folder with the "input_data" exists on HDFS and contains the input files
    FileInputFormat.setInputPaths(job1, new Path("input_data"));

    //"first_job_output" contains data that how many times a word occurred in the given file
    //This will be the input to the second job. For second job, input data name should be
    //"first_job_output". 
    FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));

    JobClient.runJob(job1);


    //Second Job - Counts total number of words in a given file

    JobConf job2 = new JobConf(TotalWords.class);
    job2.setJobName("TotalWords");

    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    job2.setMapperClass(TotalWordsMapper.class);
    job2.setCombinerClass(TotalWordsReducer.class);
    job2.setReducerClass(TotalWordsReducer.class);

    job2.setInputFormat(TextInputFormat.class);
    job2.setOutputFormat(TextOutputFormat.class);

    //Path name for this job should match first job's output path name
    FileInputFormat.setInputPaths(job2, new Path("first_job_output"));

    //This will contain the final output. If you want to send this jobs output
    //as input to third job, then third jobs input path name should be "second_job_output"
    //In this way, jobs can be chained, sending output one to other as input and get the
    //final output
    FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));

    JobClient.runJob(job2);

Commande pour exécuter ces travaux est:

Bin / hadoop Jar TotalWords.

Nous devons donner le nom final des travaux pour le commande. Dans le cas ci-dessus, C'est TotalWords.

6
répondu psrklr 2014-10-28 06:39:55

Vous pouvez utiliser oozie pour le traitement barch de vos travaux MapReduce. http://issues.apache.org/jira/browse/HADOOP-5303

4
répondu user300313 2010-03-23 21:05:43

Vous pouvez exécuter MR chain de la manière indiquée dans le code.

VEUILLEZ NOTER: Seul le code du pilote a été fourni

public class WordCountSorting {
// here the word keys shall be sorted
      //let us write the wordcount logic first

      public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
            //THE DRIVER CODE FOR MR CHAIN
            Configuration conf1=new Configuration();
            Job j1=Job.getInstance(conf1);
            j1.setJarByClass(WordCountSorting.class);
            j1.setMapperClass(MyMapper.class);
            j1.setReducerClass(MyReducer.class);

            j1.setMapOutputKeyClass(Text.class);
            j1.setMapOutputValueClass(IntWritable.class);
            j1.setOutputKeyClass(LongWritable.class);
            j1.setOutputValueClass(Text.class);
            Path outputPath=new Path("FirstMapper");
            FileInputFormat.addInputPath(j1,new Path(args[0]));
                  FileOutputFormat.setOutputPath(j1,outputPath);
                  outputPath.getFileSystem(conf1).delete(outputPath);
            j1.waitForCompletion(true);
                  Configuration conf2=new Configuration();
                  Job j2=Job.getInstance(conf2);
                  j2.setJarByClass(WordCountSorting.class);
                  j2.setMapperClass(MyMapper2.class);
                  j2.setNumReduceTasks(0);
                  j2.setOutputKeyClass(Text.class);
                  j2.setOutputValueClass(IntWritable.class);
                  Path outputPath1=new Path(args[1]);
                  FileInputFormat.addInputPath(j2, outputPath);
                  FileOutputFormat.setOutputPath(j2, outputPath1);
                  outputPath1.getFileSystem(conf2).delete(outputPath1, true);
                  System.exit(j2.waitForCompletion(true)?0:1);
      }

}

LA SÉQUENCE EST

(TÂCHE1)CARTE->RÉDUIRE-> (JOB2)CARTE
Cela a été fait pour obtenir les clés triées mais il y a plus de façons telles que l'utilisation d'un treemap
Pourtant, je veux concentrer votre attention sur la façon dont les emplois ont été enchaînés!!
Merci

4
répondu Aniruddha Sinha 2015-10-27 13:12:19

Il y a des exemples dans le projet Apache Mahout qui enchaînent plusieurs tâches MapReduce. Un des exemples peut être trouvé à:

RecommenderJob.java

Http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob

3
répondu Christie English 2011-05-25 23:38:52

Nous pouvons utiliser la méthode waitForCompletion(true) du Job pour définir la dépendance entre le job.

Dans mon scénario, j'avais 3 emplois qui dépendaient les uns des autres. Dans la classe de pilote, j'ai utilisé le code ci-dessous et cela fonctionne comme prévu.

public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        CCJobExecution ccJobExecution = new CCJobExecution();

        Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]);
        Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]);
        Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]);

        System.out.println("****************Started Executing distanceTimeFraudJob ================");
        distanceTimeFraudJob.submit();
        if(distanceTimeFraudJob.waitForCompletion(true))
        {
            System.out.println("=================Completed DistanceTimeFraudJob================= ");
            System.out.println("=================Started Executing spendingFraudJob ================");
            spendingFraudJob.submit();
            if(spendingFraudJob.waitForCompletion(true))
            {
                System.out.println("=================Completed spendingFraudJob================= ");
                System.out.println("=================Started locationFraudJob================= ");
                locationFraudJob.submit();
                if(locationFraudJob.waitForCompletion(true))
                {
                    System.out.println("=================Completed locationFraudJob=================");
                }
            }
        }
    }
3
répondu Shivaprasad 2013-01-28 07:48:48

La nouvelle classe org.Apache.hadoop.mapreduce.lib.chaîne.Chainmapper aide ce scénario

2
répondu Xavi 2016-02-01 16:29:30

Bien qu'il existe des moteurs de flux de travail Hadoop complexes basés sur le serveur, par exemple, oozie, j'ai une bibliothèque java simple qui permet l'exécution de plusieurs tâches Hadoop en tant que flux de travail. La configuration du travail et le workflow définissant la dépendance entre les tâches sont configurés dans un fichier JSON. Tout est configurable en externe et ne nécessite aucune modification de l'implémentation Map reduce existante pour faire partie d'un workflow.

Les détails peuvent être trouvés ici. Le code Source et jar est disponible en github.

Http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/

Pranab

1
répondu Pranab 2011-05-26 18:58:56

Je pense qu'oozie aide les travaux conséquents à recevoir les entrées directement du travail précédent. Cela évite l'opération d'E/S effectuée avec jobcontrol.

1
répondu stholy 2012-11-13 22:28:59

Si vous souhaitez enchaîner vos tâches par programme, vous devrez utiliser JobControl. L'utilisation est assez simple:

    JobControl jobControl = new JobControl(name);

Après cela, vous ajoutez des instances ControlledJob. ControlledJob définit un travail avec ses dépendances, branchant ainsi automatiquement les entrées et les sorties pour s'adapter à une "chaîne" de tâches.

    jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2));

    jobControl.run();

Démarre la chaîne. Vous voudrez mettre cela dans un fil speerate. Cela permet de vérifier l'état de votre chaîne lorsqu'elle s'exécute:

    while (!jobControl.allFinished()) {
        System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size());
        System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size());
        System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size());
        List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList();
        System.out.println("Jobs in success state: " + successfulJobList.size());
        List<ControlledJob> failedJobList = jobControl.getFailedJobList();
        System.out.println("Jobs in failed state: " + failedJobList.size());
    }
1
répondu Erik Schmiegelow 2014-02-28 16:52:46

Comme vous l'avez mentionné dans votre exigence que vous voulez que o / p de MRJob1 soit l'i / p de MRJob2 et ainsi de suite, vous pouvez envisager d'utiliser Oozie workflow pour ce cas d'utilisation. Vous pouvez également envisager d'écrire vos données intermédiaires sur HDFS car elles seront utilisées par le prochain MRJob. Et une fois le travail terminé, vous pouvez nettoyer vos données intermédiaires.

<start to="mr-action1"/>
<action name="mr-action1">
   <!-- action for MRJob1-->
   <!-- set output path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="mr-action2">
   <!-- action for MRJob2-->
   <!-- set input path = /tmp/intermediate/mr1-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="success">
        <!-- action for success-->
    <ok to="end"/>
    <error to="end"/>
</action>

<action name="fail">
        <!-- action for fail-->
    <ok to="end"/>
    <error to="end"/>
</action>

<end name="end"/>

0
répondu Neha Kumari 2018-03-02 10:43:26