Comment lister tous les fichiers d'un répertoire et de ses sous-répertoires dans hadoop hdfs
j'ai un dossier dans hdfs qui a deux sous-dossiers chacun a environ 30 sous-dossiers qui,enfin,chacun contient des fichiers xml. Je veux lister tous les fichiers xml donnant seulement le chemin du dossier principal. Localement je peux le faire avec apache commons-io FileUtils.listFiles(). J'ai essayé cette
FileStatus[] status = fs.listStatus( new Path( args[ 0 ] ) );
mais il ne Liste que les deux premiers sous-dossiers et il ne va pas plus loin. Est-il un moyen de le faire dans hadoop?
9 réponses
Vous aurez besoin d'utiliser le système de fichiers objet et effectuer une certaine logique sur la résultante FileStatus objets manuellement de manière récursive dans les sous-répertoires.
vous pouvez également appliquer un PathFilter pour retourner Seulement les fichiers xml en utilisant le listStatus(Chemin d'accès, PathFilter) méthode
la classe hadoop FsShell en a des exemples pour la commande hadoop fs-lsr, qui est récursive ls-voir la source, autour de la ligne 590 (le pas récursif est déclenché sur la ligne 635)
si vous utilisez hadoop 2.* API il y a de plus élégant solutions:
Configuration conf = getConf();
Job job = Job.getInstance(conf);
FileSystem fs = FileSystem.get(conf);
//the second boolean parameter here sets the recursion to true
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
new Path("path/to/lib"), true);
while(fileStatusListIterator.hasNext()){
LocatedFileStatus fileStatus = fileStatusListIterator.next();
//do stuff with the file like ...
job.addFileToClassPath(fileStatus.getPath());
}
Avez-vous essayé ceci:
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class cat{
public static void main (String [] args) throws Exception{
try{
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in")); // you need to pass in your hdfs path
for (int i=0;i<status.length;i++){
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
String line;
line=br.readLine();
while (line != null){
System.out.println(line);
line=br.readLine();
}
}
}catch(Exception e){
System.out.println("File not found");
}
}
}
/**
* @param filePath
* @param fs
* @return list of absolute file path present in given path
* @throws FileNotFoundException
* @throws IOException
*/
public static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
List<String> fileList = new ArrayList<String>();
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
if (fileStat.isDirectory()) {
fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
} else {
fileList.add(fileStat.getPath().toString());
}
}
return fileList;
}
exemple rapide : supposons que vous ayez la structure de fichier suivante:
a -> b
-> c -> d
-> e
-> d -> f
en utilisant le code ci-dessus, vous obtenez:
a/b
a/c/d
a/c/e
a/d/f
si vous voulez seulement la feuille (c.-à-d. les noms de fichier), utilisez le code suivant dans else
bloc :
...
} else {
String fileName = fileStat.getPath().toString();
fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1));
}
Cela va donner:
b
d
e
f
voici un extrait de code, qui compte le nombre de fichiers dans un répertoire HDFS particulier (je l'ai utilisé pour déterminer combien de réducteurs utiliser dans un code ETL particulier). Vous pouvez facilement modifier cela pour répondre à vos besoins.
private int calculateNumberOfReducers(String input) throws IOException {
int numberOfReducers = 0;
Path inputPath = new Path(input);
FileSystem fs = inputPath.getFileSystem(getConf());
FileStatus[] statuses = fs.globStatus(inputPath);
for(FileStatus status: statuses) {
if(status.isDirectory()) {
numberOfReducers += getNumberOfInputFiles(status, fs);
} else if(status.isFile()) {
numberOfReducers ++;
}
}
return numberOfReducers;
}
/**
* Recursively determines number of input files in an HDFS directory
*
* @param status instance of FileStatus
* @param fs instance of FileSystem
* @return number of input files within particular HDFS directory
* @throws IOException
*/
private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException {
int inputFileCount = 0;
if(status.isDirectory()) {
FileStatus[] files = fs.listStatus(status.getPath());
for(FileStatus file: files) {
inputFileCount += getNumberOfInputFiles(file, fs);
}
} else {
inputFileCount ++;
}
return inputFileCount;
}
Merci Radu Adrian Moldovan pour la suggestion.
Voici une implémentation utilisant queue:
private static List<String> listAllFilePath(Path hdfsFilePath, FileSystem fs)
throws FileNotFoundException, IOException {
List<String> filePathList = new ArrayList<String>();
Queue<Path> fileQueue = new LinkedList<Path>();
fileQueue.add(hdfsFilePath);
while (!fileQueue.isEmpty()) {
Path filePath = fileQueue.remove();
if (fs.isFile(filePath)) {
filePathList.add(filePath.toString());
} else {
FileStatus[] fileStatus = fs.listStatus(filePath);
for (FileStatus fileStat : fileStatus) {
fileQueue.add(fileStat.getPath());
}
}
}
return filePathList;
}
maintenant, on peut utiliser Spark pour faire la même chose et son chemin plus rapidement que d'autres approches (comme Hadoop MR). Voici l'extrait de code.
def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) {
val files = FileSystem.get( sparkContext.hadoopConfiguration ).listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) {
filePaths+=fileStatus.getPath().toString()
}
else if(fileStatus.isDirectory()) {
traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
}
ne pas utiliser d'approche récursive (questions de tas)) :) utiliser une file d'attente
queue.add(param_dir)
while (queue is not empty){
directory= queue.pop
- get items from current directory
- if item is file add to a list (final list)
- if item is directory => queue.push
}
c'était facile, profitez-en!
extrait de Code pour les deux récursives et non récursives approches:
//helper method to get the list of files from the HDFS path
public static List<String>
listFilesFromHDFSPath(Configuration hadoopConfiguration,
String hdfsPath,
boolean recursive) throws IOException,
IllegalArgumentException
{
//resulting list of files
List<String> filePaths = new ArrayList<String>();
//get path from string and then the filesystem
Path path = new Path(hdfsPath); //throws IllegalArgumentException
FileSystem fs = path.getFileSystem(hadoopConfiguration);
//if recursive approach is requested
if(recursive)
{
//(heap issues with recursive approach) => using a queue
Queue<Path> fileQueue = new LinkedList<Path>();
//add the obtained path to the queue
fileQueue.add(path);
//while the fileQueue is not empty
while (!fileQueue.isEmpty())
{
//get the file path from queue
Path filePath = fileQueue.remove();
//filePath refers to a file
if (fs.isFile(filePath))
{
filePaths.add(filePath.toString());
}
else //else filePath refers to a directory
{
//list paths in the directory and add to the queue
FileStatus[] fileStatuses = fs.listStatus(filePath);
for (FileStatus fileStatus : fileStatuses)
{
fileQueue.add(fileStatus.getPath());
} // for
} // else
} // while
} // if
else //non-recursive approach => no heap overhead
{
//if the given hdfsPath is actually directory
if(fs.isDirectory(path))
{
FileStatus[] fileStatuses = fs.listStatus(path);
//loop all file statuses
for(FileStatus fileStatus : fileStatuses)
{
//if the given status is a file, then update the resulting list
if(fileStatus.isFile())
filePaths.add(fileStatus.getPath().toString());
} // for
} // if
else //it is a file then
{
//return the one and only file path to the resulting list
filePaths.add(path.toString());
} // else
} // else
//close filesystem; no more operations
fs.close();
//return the resulting list
return filePaths;
} // listFilesFromHDFSPath