Comment faire CopyMerge en Hadoop 3.0?
je sais hadoop
version 2.7
's FileUtil
a la copyMerge
fonction qui fusionne plusieurs fichiers en un seul.
mais la fonction copyMerge
n'est plus supportée par L'API dans la version 3.0
.
des idées pour fusionner tous les fichiers d'un répertoire dans un nouveau fichier unique dans la version 3.0
de hadoop?
3 réponses
FileUtil#copyMerge a été retirée. Voir les détails du changement majeur:
https://issues.apache.org/jira/browse/HADOOP-12967
https://issues.apache.org/jira/browse/HADOOP-11392
vous pouvez utiliser getmerge
Usage: hadoop fs-getmerge [- nl]
prend un répertoire source et un fichier de destination comme entrée et concaténate les fichiers de src dans le fichier local de destination. Optionnellement-nl peut être défini pour permettre l'ajout d'un caractère de nouvelle ligne (LF) à la fin de chaque fichier. - skip-empty-file peut être utilisé pour éviter les caractères newline indésirables en cas de fichiers vides.
exemples:
hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
Code De Sortie: Renvoie 0 en cas de succès et non-zéro en cas d'erreur.
j'avais la même question et j'ai dû ré-implémenter copyMerge (en PySpark bien, mais en utilisant les mêmes appels d'API comme à l'origine copyMerge).
N'ont aucune idée pourquoi il n'y a pas de fonctionnalité équivalente dans Hadoop 3. Nous devons fusionner très souvent des fichiers à partir d'un répertoire HDFS vers un fichier HDFS.
voici la mise en œuvre dans pySpark I référencé ci-dessus https://github.com/Tagar/stuff/blob/master/copyMerge.py
comme FileUtil.copyMerge()
a été déprécié et retiré de L'API de départ version 3, une solution simple consiste à re-mettre en œuvre nous-mêmes.
ici est le Java implémentation originale des versions précédentes.
voici un Scala rewrite:
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException
def copyMerge(
srcFS: FileSystem, srcDir: Path,
dstFS: FileSystem, dstFile: Path,
deleteSource: Boolean, conf: Configuration
): Boolean = {
if (dstFS.exists(dstFile))
throw new IOException(s"Target $dstFile already exists")
// Source path is expected to be a directory:
if (srcFS.getFileStatus(srcDir).isDirectory()) {
val outputFile = dstFS.create(dstFile)
Try {
srcFS
.listStatus(srcDir)
.sortBy(_.getPath.getName)
.collect {
case status if status.isFile() =>
val inputFile = srcFS.open(status.getPath())
Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
inputFile.close()
}
}
outputFile.close()
if (deleteSource) srcFS.delete(srcDir, true) else true
}
else false
}