Renommer les fichiers de pièces dans Hadoop Map Reduce

J'ai essayé d'utiliser le MultipleOutputs la classe comme dans l'exemple de la page http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

Code Du Pilote

    Configuration conf = new Configuration();
    Job job = new Job(conf, "Wordcount");
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
            Text.class, IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);

Code Réducteur

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    private MultipleOutputs<Text, IntWritable> mos;
    public void setup(Context context){
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        //context.write(key, result);
        mos.write("text", key,result);
    }
    public void cleanup(Context context)  {
         try {
            mos.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         }
}

La sortie du réducteur est à renommer-texte-r-00000

Mais le problème ici est que je reçois également un fichier part-r-00000 vide. Est-ce ainsi que MultipleOutputs devrait se comporter, ou y a-t-il des problème avec mon code? S'il vous plaît conseils.

Une autre alternative que j'ai essayée consiste à Parcourir mon dossier de sortie en utilisant la classe FileSystem et à renommer manuellement tous les fichiers commençant par part.

Quel est le meilleur moyen?

FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
} 
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
21
demandé sur Balder 2013-01-28 08:11:58

2 réponses

Même si vous utilisez MultipleOutputs, le OutputFormat par défaut (je crois que c'est TextOutputFormat) est toujours utilisé, et donc il initialisera et créera ces fichiers part-r-xxxxx que vous voyez.

Le fait qu'ils soient vides est parce que vous n'en faites pas context.write parce que vous utilisez MultipleOutputs. Mais cela ne les empêche pas d'être créés lors de l'initialisation.

Pour vous en débarrasser, vous devez définir votre OutputFormat pour dire que vous n'attendez aucune sortie. Vous pouvez le faire ceci façon:

job.setOutputFormat(NullOutputFormat.class);

Avec cet ensemble de propriétés, cela devrait garantir que vos fichiers partiels ne sont jamais initialisés, mais que vous obtenez toujours votre sortie dans le MultipleOutputs.

Vous pouvez également probablement utiliser LazyOutputFormat qui garantirait que vos fichiers de sortie ne sont créés que quand / s'il y a des données, et ne pas initialiser les fichiers vides. Vous pourriez le faire de cette façon:

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

Remarque que vous utilisez dans votre Reducer le prototype MultipleOutputs.write(String namedOutput, K key, V value), qui utilise par défaut un chemin de sortie qui sera généré basé sur votre namedOutput à quelque chose comme: {namedOutput}-(m|r)-{part-number}. Si vous voulez avoir plus de contrôle sur vos noms de fichiers de sortie, vous devez utiliser le prototype MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath) qui peut vous permettre d'obtenir des noms de fichiers générés à l'exécution en fonction de vos clés/valeurs.

21
répondu Charles Menguy 2013-01-28 04:46:25

C'est tout ce que vous devez faire dans la classe Driver pour changer le nom de base du fichier de sortie: job.getConfiguration().set("mapreduce.output.basename", "text"); Donc, cela se traduira par vos fichiers appelés "text-R-00000".

11
répondu RHolland 2015-02-03 12:08:55