Hadoop DistributedCache est déprécié - Quelle est l'API préférée?

Mes tâches de map ont besoin de quelques données de configuration, que je voudrais distribuer via le Cache distribué.

Le Hadoop MapReduce Tutorial spectacles utilisation de la classe DistributedCache, à peu près comme suit:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

Cependant, DistributedCachemarqué comme obsolète dans Hadoop 2.2.0.

Quelle est la nouvelle façon privilégiée d'y parvenir? Y a-t-il un exemple ou un tutoriel à jour concernant cette API?

33
demandé sur tolgap 2014-01-20 20:53:09

6 réponses

les API pour le Cache distribué peuvent être trouvées dans la classe Job elle-même. Consultez la documentation ici:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html Le code devrait être quelque chose comme

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

dans votre code mapper:

Path[] localPaths = context.getLocalCacheFiles();
...
49
répondu user2371156 2014-01-20 17:53:27

pour développer sur @jtravaglini, la façon préférée d'utiliser DistributedCache pour les FILS/MapReduce 2 est comme suit:

Dans votre pilote, utilisez le Job.addCacheFile()

public int run(String[] args) throws Exception {
    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "MyJob");

    job.setMapperClass(MyMapper.class);

    // ...

    // Mind the # sign after the absolute file location.
    // You will be using the name after the # sign as your
    // file name in your Mapper/Reducer
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

    return job.waitForCompletion(true) ? 0 : 1;
}

Et dans votre Mapper/Réducteur, remplacer le setup(Context context) méthode:

@Override
protected void setup(
        Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException {
    if (context.getCacheFiles() != null
            && context.getCacheFiles().length > 0) {

        File some_file = new File("./some");
        File other_file = new File("./other");

        // Do things to these two files, like read them
        // or parse as JSON or whatever.
    }
    super.setup(context);
}
18
répondu tolgap 2014-10-17 08:36:15

la nouvelle API DistributedCache pour YARN / MR2 se trouve dans le org.apache.hadoop.mapreduce.Job classe.

   Job.addCacheFile()

malheureusement, il n'y a pas encore beaucoup d'exemples de ce genre.

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

5
répondu jtravaglini 2014-01-20 17:58:04

je n'ai pas d'offres d'emploi.addCacheFile (). À la place, j'ai utilisé l'option-files comme "-files /path/to/myfile.txt # myfile " comme avant. Puis dans le code mapper ou reducer j'utilise la méthode ci-dessous:

/**
 * This method can be used with local execution or HDFS execution. 
 * 
 * @param context
 * @param symLink
 * @param throwExceptionIfNotFound
 * @return
 * @throws IOException
 */
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
{
    URI[] uris = context.getCacheFiles();
    if(uris==null||uris.length==0)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    URI symlinkUri = null;
    for(URI uri: uris)
    {
        if(symLink.equals(uri.getFragment()))
        {
            symlinkUri = uri;
            break;
        }
    }   
    if(symlinkUri==null)
    {
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    }
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);

}

Puis dans mapper/réducteur:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
    ... do work ...
}

Note que si j'ai utilisé "-les fichiers /chemin/vers/monfichier.txt" directement puis-je utiliser "monfichier.txt" pour accéder au fichier depuis qui par défaut est le nom du lien symbolique.

2
répondu Jackie Jiang 2015-10-15 01:10:03

aucune des solutions mentionnées n'a fonctionné pour moi dans son intégralité . Il pourrait parce que la version Hadoop ne cesse de changer j'utilise hadoop 2.6.4. Essentiellement, DistributedCache est déprécié, donc je ne voulais pas l'utiliser. Comme certains post nous suggèrent d'utiliser addCacheFile() cependant, il a un peu changé. Voici comment cela a fonctionné pour moi

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

ici X. X. X peut être l'adresse IP maître ou localhost. The EnglishStop.txt a été stocké dans HDFS at / location.

hadoop fs -ls /

le la sortie est

-rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test

Drôle, mais pratique, #EnglishStop.txt signifie Maintenant que nous pouvons y accéder en tant qu '" English Stop.txt" dans le mappeur. Voici le code pour le même

public void setup(Context context) throws IOException, InterruptedException     
{
    File stopwordFile = new File("EnglishStop.txt");
    FileInputStream fis = new FileInputStream(stopwordFile);
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

    while ((stopWord = reader.readLine()) != null) {
        // stopWord is a word read from Cache
    }
}

Cette question a fonctionné pour moi. Vous pouvez lire la ligne du fichier stocké dans HDFS

1
répondu Somum 2016-03-13 10:30:12

j'ai eu le même problème. Et non seulement DistributedCach est déprécié, mais getlocalchefiles et" nouveau travail " aussi. Donc ce qui a fonctionné pour moi est la suivante:

Pilote:

Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());

dans la configuration Mapper/Reducer:

@Override
protected void setup(Context context) throws IOException, InterruptedException
{
    super.setup(context);

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null

    Path file1path = new Path(files[0])
    ...
}
0
répondu patapouf_ai 2015-06-01 12:33:01