Flux Java 8 avec traitement par lots

j'ai un gros fichier qui contient une liste d'éléments.

je voudrais créer un lot d'items, faire une requête HTTP avec ce lot (tous les items sont nécessaires comme paramètres dans la requête HTTP). Je peux le faire très facilement avec une boucle for , mais en tant qu'amateur de Java 8, je veux essayer d'écrire ceci avec le stream framework de Java 8 (et récolter les bénéfices du traitement paresseux).

exemple:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

je veux faire quelque chose d'une longue ligne de lazyFileStream.group(500).map(processBatch).collect(toList())

Quelle serait la meilleure façon de le faire?

62
demandé sur Tagir Valeev 2015-06-04 13:27:40

10 réponses

Vous pourriez le faire avec jOOλ , une bibliothèque qui s'étend Java 8 flux de single-threaded, flux séquentiel des cas d'utilisation:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

Derrière les coulisses zipWithIndex() est juste:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... attendu que groupBy() est la commodité de L'API pour:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(clause de non-responsabilité: je travaille pour la société derrière jOOλ)

13
répondu Lukas Eder 2016-02-16 22:07:29

pour l'exhaustivité, voici une solution Goyave .

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process);

dans la question la collection est disponible de sorte qu'un flux n'est pas nécessaire et il peut être écrit comme,

Iterables.partition(data, batchSize).forEach(this::process);
75
répondu Ben Manes 2017-12-28 18:29:55

L'implémentation Java Pure-8 est également possible:

int BATCH = 500;
IntStream.range(0, (data.size()+BATCH-1)/BATCH)
         .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH)))
         .forEach(batch -> process(batch));

notez que contrairement à JOOl il peut fonctionner correctement en parallèle (à condition que votre data soit une liste d'accès aléatoire).

31
répondu Tagir Valeev 2015-06-07 15:12:20

Pure Java 8 solution :

nous pouvons créer un collecteur personnalisé pour faire cela élégamment, qui prend dans un batch size et un Consumer pour traiter chaque lot:

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.*;
import java.util.stream.Collector;

import static java.util.Objects.requireNonNull;


/**
 * Collects elements in the stream and calls the supplied batch processor
 * after the configured batch size is reached.
 *
 * In case of a parallel stream, the batch processor may be called with
 * elements less than the batch size.
 *
 * The elements are not kept in memory, and the final result will be an
 * empty list.
 *
 * @param <T> Type of the elements being collected
 */
class BatchCollector<T> implements Collector<T, List<T>, List<T>> {

    private final int batchSize;
    private final Consumer<List<T>> batchProcessor;


    /**
     * Constructs the batch collector
     *
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     */
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        batchProcessor = requireNonNull(batchProcessor);

        this.batchSize = batchSize;
        this.batchProcessor = batchProcessor;
    }

    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    public BiConsumer<List<T>, T> accumulator() {
        return (ts, t) -> {
            ts.add(t);
            if (ts.size() >= batchSize) {
                batchProcessor.accept(ts);
                ts.clear();
            }
        };
    }

    public BinaryOperator<List<T>> combiner() {
        return (ts, ots) -> {
            // process each parallel list without checking for batch size
            // avoids adding all elements of one to another
            // can be modified if a strict batching mode is required
            batchProcessor.accept(ts);
            batchProcessor.accept(ots);
            return Collections.emptyList();
        };
    }

    public Function<List<T>, List<T>> finisher() {
        return ts -> {
            batchProcessor.accept(ts);
            return Collections.emptyList();
        };
    }

    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

créer En Option une classe d'utilité helper:

import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collector;

public class StreamUtils {

    /**
     * Creates a new batch collector
     * @param batchSize the batch size after which the batchProcessor should be called
     * @param batchProcessor the batch processor which accepts batches of records to process
     * @param <T> the type of elements being processed
     * @return a batch collector instance
     */
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) {
        return new BatchCollector<T>(batchSize, batchProcessor);
    }
}

exemple d'usage:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> output = new ArrayList<>();

int batchSize = 3;
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs);

input.stream()
     .collect(StreamUtils.batchCollector(batchSize, batchProcessor));

j'ai aussi posté mon code sur GitHub, si quelqu'un veut jeter un oeil:

lien vers Github

24
répondu rohitvats 2017-07-26 12:35:14

j'ai écrit un Spliterator personnalisé pour des scénarios comme celui-ci. Il remplira les listes d'une taille donnée à partir du Flux d'entrée. L'avantage de cette approche est qu'elle effectuera un traitement paresseux, et fonctionnera avec d'autres fonctions de flux.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) {
    return batchSize <= 0
        ? Stream.of(stream.collect(Collectors.toList()))
        : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel());
}

private static class BatchSpliterator<E> implements Spliterator<List<E>> {

    private final Spliterator<E> base;
    private final int batchSize;

    public BatchSpliterator(Spliterator<E> base, int batchSize) {
        this.base = base;
        this.batchSize = batchSize;
    }

    @Override
    public boolean tryAdvance(Consumer<? super List<E>> action) {
        final List<E> batch = new ArrayList<>(batchSize);
        for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++)
            ;
        if (batch.isEmpty())
            return false;
        action.accept(batch);
        return true;
    }

    @Override
    public Spliterator<List<E>> trySplit() {
        if (base.estimateSize() <= batchSize)
            return null;
        final Spliterator<E> splitBase = this.base.trySplit();
        return splitBase == null ? null
                : new BatchSpliterator<>(splitBase, batchSize);
    }

    @Override
    public long estimateSize() {
        final double baseSize = base.estimateSize();
        return baseSize == 0 ? 0
                : (long) Math.ceil(baseSize / (double) batchSize);
    }

    @Override
    public int characteristics() {
        return base.characteristics();
    }

}
8
répondu Bruce Hamilton 2017-01-19 17:45:01

vous pouvez également utiliser RxJava :

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch));

ou

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList();

ou

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList();
7
répondu frhack 2015-07-03 15:43:30

, Vous pouvez aussi jeter un oeil à cyclope-réagir , je suis l'auteur de cette bibliothèque. Il implémente l'interface jOOλ (et par extension les flux JDK 8), mais contrairement aux flux parallèles JDK 8, il se concentre sur les opérations asynchrones (comme le blocage potentiel des appels I/O asynchrones). JDK Flux Parallèles, en revanche l'accent sur le parallélisme de données pour en CPU opérations. Il fonctionne en gérant les agrégats des tâches futures basées sous la hotte, mais présente une norme étendue Flux API pour les utilisateurs finaux.

ce code d'exemple peut vous aider à démarrer

LazyFutureStream.parallelCommonBuilder()
                .react(data)
                .grouped(BATCH_SIZE)                  
                .map(this::process)
                .run();

il y a un tutorial sur la mise en lots ici

et un tutoriel plus général ici

pour utiliser votre propre Pool de threads (ce qui est probablement plus approprié pour bloquer l'entrée/sortie), vous pouvez commencer le traitement avec

     LazyReact reactor = new LazyReact(40);

     reactor.react(data)
            .grouped(BATCH_SIZE)                  
            .map(this::process)
            .run();
6
répondu John McClean 2016-04-01 18:46:21

nous avions un problème similaire à résoudre. Nous voulions prendre un flux qui était plus grand que la mémoire système (itérant à travers tous les objets dans une base de données) et randomiser l'ordre aussi bien que possible - nous avons pensé qu'il serait correct de tamponner 10.000 articles et de les randomiser.

la cible était une fonction qui prenait dans un flux.

des solutions proposées ici, il semble y avoir un éventail d'options:

  • Utilisation divers non-java 8 bibliothèques supplémentaires
  • commencer par quelque chose qui n'est pas un flux-par exemple une liste d'accès aléatoire
  • ont un flux qui peut être divisé facilement dans un spliterator

Notre instinct était à l'origine l'utilisation d'un collecteur personnalisé, mais cela signifiait l'abandon de la diffusion. La solution custom collector ci-dessus est très bonne et nous l'avons presque utilisée.

Voici une solution qui trompe par en utilisant le fait que Stream s peut vous donner un Iterator que vous pouvez utiliser comme une trappe d'évacuation pour vous permettre de faire quelque chose de plus que les ruisseaux ne supportent pas. Le Iterator est reconverti en un flux en utilisant un autre bit de sorcellerie Java 8 StreamSupport .

/**
 * An iterator which returns batches of items taken from another iterator
 */
public class BatchingIterator<T> implements Iterator<List<T>> {
    /**
     * Given a stream, convert it to a stream of batches no greater than the
     * batchSize.
     * @param originalStream to convert
     * @param batchSize maximum size of a batch
     * @param <T> type of items in the stream
     * @return a stream of batches taken sequentially from the original stream
     */
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) {
        return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize));
    }

    private static <T> Stream<T> asStream(Iterator<T> iterator) {
        return StreamSupport.stream(
            Spliterators.spliteratorUnknownSize(iterator,ORDERED),
            false);
    }

    private int batchSize;
    private List<T> currentBatch;
    private Iterator<T> sourceIterator;

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
        this.batchSize = batchSize;
        this.sourceIterator = sourceIterator;
    }

    @Override
    public boolean hasNext() {
        prepareNextBatch();
        return currentBatch!=null && !currentBatch.isEmpty();
    }

    @Override
    public List<T> next() {
        return currentBatch;
    }

    private void prepareNextBatch() {
        currentBatch = new ArrayList<>(batchSize);
        while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
            currentBatch.add(sourceIterator.next());
        }
    }
}

un exemple simple d'utilisation de ceci ressemblerait à ceci:

@Test
public void getsBatches() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        .forEach(System.out::println);
}

ci-dessus affiche

[A, B, C]
[D, E, F]

pour notre utilisez case, nous voulions mélanger les lots et puis les garder comme un flux - il ressemblait à ceci:

@Test
public void howScramblingCouldBeDone() {
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3)
        // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one
        .map(list -> {
            Collections.shuffle(list); return list; })
        .flatMap(List::stream)
        .forEach(System.out::println);
}

cette sortie quelque chose comme (il est randomisé, donc différent à chaque fois)

A
C
B
E
D
F

la sauce secrète ici est qu'il y a toujours un jet, donc vous pouvez soit opérer sur un jet de lots, ou faire quelque chose à chaque lot et puis flatMap il de nouveau à un jet. Encore mieux, toutes les courses ci-dessus que la finale forEach ou collect ou d'autres expressions terminales PULL les données à travers le flux.

il s'avère que iterator est un type spécial de fin de l'opération sur un flux et ne cause pas le flux entier à courir et venir dans la mémoire! Merci aux gars de Java 8 pour un design brillant!

4
répondu Ashley Frieze 2017-03-01 12:55:02

exemple Simple utilisant Spliterator

    // read file into stream, try-with-resources
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
        //skip header
        Spliterator<String> split = stream.skip(1).spliterator();
        Chunker<String> chunker = new Chunker<String>();
        while(true) {              
            boolean more = split.tryAdvance(chunker::doSomething);
            if (!more) {
                break;
            }
        }           
    } catch (IOException e) {
        e.printStackTrace();
    }
}

static class Chunker<T> {
    int ct = 0;
    public void doSomething(T line) {
        System.out.println(ct++ + " " + line.toString());
        if (ct % 100 == 0) {
            System.out.println("====================chunk=====================");               
        }           
    }       
}

Bruce réponse est plus complète, mais je cherchais quelque chose de rapide et sale pour traiter un tas de fichiers.

0
répondu rhinmass 2017-11-23 09:20:13

pur exemple Java 8 qui fonctionne aussi avec des flux parallèles.

comment utiliser:

Stream<Integer> integerStream = IntStream.range(0, 45).parallel().boxed();
CsStreamUtil.processInBatch(integerStream, 10, batch -> System.out.println("Batch: " + batch));

La méthode de déclaration et de mise en œuvre:

public static <ElementType> void processInBatch(Stream<ElementType> stream, int batchSize, Consumer<Collection<ElementType>> batchProcessor)
{
    List<ElementType> newBatch = new ArrayList<>(batchSize);

    stream.forEach(element -> {
        List<ElementType> fullBatch;

        synchronized (newBatch)
        {
            if (newBatch.size() < batchSize)
            {
                newBatch.add(element);
                return;
            }
            else
            {
                fullBatch = new ArrayList<>(newBatch);
                newBatch.clear();
                newBatch.add(element);
            }
        }

        batchProcessor.accept(fullBatch);
    });

    if (newBatch.size() > 0)
        batchProcessor.accept(new ArrayList<>(newBatch));
}
0
répondu Nicolas Mongrain-Lacombe 2018-08-20 16:53:31