Comment faire CopyMerge en Hadoop 3.0?

je sais hadoop version 2.7 's FileUtil a la copyMerge fonction qui fusionne plusieurs fichiers en un seul.

mais la fonction copyMerge n'est plus supportée par L'API dans la version 3.0 .

des idées pour fusionner tous les fichiers d'un répertoire dans un nouveau fichier unique dans la version 3.0 de hadoop?

5
demandé sur Xavier Guihot 2017-02-04 05:08:47

3 réponses

La méthode

FileUtil#copyMerge a été retirée. Voir les détails du changement majeur:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

vous pouvez utiliser getmerge

Usage: hadoop fs-getmerge [- nl]

prend un répertoire source et un fichier de destination comme entrée et concaténate les fichiers de src dans le fichier local de destination. Optionnellement-nl peut être défini pour permettre l'ajout d'un caractère de nouvelle ligne (LF) à la fin de chaque fichier. - skip-empty-file peut être utilisé pour éviter les caractères newline indésirables en cas de fichiers vides.

exemples:

hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt

Code De Sortie: Renvoie 0 en cas de succès et non-zéro en cas d'erreur.

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

3
répondu ravi 2017-02-04 03:42:04

j'avais la même question et j'ai dû ré-implémenter copyMerge (en PySpark bien, mais en utilisant les mêmes appels d'API comme à l'origine copyMerge).

N'ont aucune idée pourquoi il n'y a pas de fonctionnalité équivalente dans Hadoop 3. Nous devons fusionner très souvent des fichiers à partir d'un répertoire HDFS vers un fichier HDFS.

voici la mise en œuvre dans pySpark I référencé ci-dessus https://github.com/Tagar/stuff/blob/master/copyMerge.py

2
répondu Tagar 2017-11-29 18:06:05

comme FileUtil.copyMerge() a été déprécié et retiré de L'API de départ version 3, une solution simple consiste à re-mettre en œuvre nous-mêmes.

ici est le Java implémentation originale des versions précédentes.

voici un Scala rewrite:

import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException

def copyMerge(
    srcFS: FileSystem, srcDir: Path,
    dstFS: FileSystem, dstFile: Path,
    deleteSource: Boolean, conf: Configuration
): Boolean = {

  if (dstFS.exists(dstFile))
    throw new IOException(s"Target $dstFile already exists")

  // Source path is expected to be a directory:
  if (srcFS.getFileStatus(srcDir).isDirectory()) {

    val outputFile = dstFS.create(dstFile)
    Try {
      srcFS
        .listStatus(srcDir)
        .sortBy(_.getPath.getName)
        .collect {
          case status if status.isFile() =>
            val inputFile = srcFS.open(status.getPath())
            Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
            inputFile.close()
        }
    }
    outputFile.close()

    if (deleteSource) srcFS.delete(srcDir, true) else true
  }
  else false
}
0
répondu Xavier Guihot 2018-05-26 18:31:52