Comment puis-je créer un séparateur de pagination à usage général?

Je voudrais pouvoir traiter un flux java en lecture à partir d'une source qui doit être accessible dans les pages. Comme première approche, j'ai implémenté un itérateur de pagination qui demandait simplement des pages lorsque la page actuelle était à court d'éléments, puis utilisé StreamSupport.stream(iterator, false) pour obtenir un handle de flux sur l'itérateur.

Comme j'ai trouvé que mes pages sont assez chères à récupérer, je voudrais accéder aux pages par le biais d'un flux parallèle. À ce stade j'ai découvert que le parallélisme fourni par mon approche naïve est inexistant en raison de l'implémentation spliterator que java fournit directement à partir d'un itérateur. Comme je connais beaucoup les éléments que je voudrais traverser (je connais le nombre total de résultats après avoir demandé la première page, et la source prend en charge un décalage et une limite), je pense qu'il devrait être possible d'implémenter mon propre spliterator qui réalise une concurrence réelle (à la fois dans le travail effectué sur les éléments D'une

J'ai pu réaliser le " travail effectué sur éléments " concurrence assez facilement, mais dans mon implémentation initiale, l'interrogation d'une page n'est jamais effectuée que par le spliterator le plus haut et ne bénéficie donc pas de la division du travail offerte par l'implémentation Fork-join.

Comment puis-je écrire un spliterator qui atteint ces deux objectifs?

Pour référence, je vais fournir ce que j'ai fait jusqu'à présent (je sais qu'il ne divise pas les requêtes de manière appropriée).

   public final class PagingSourceSpliterator<T> implements Spliterator<T> {

    public static final long DEFAULT_PAGE_SIZE = 100;

    private Page<T> result;
    private Iterator<T> results;
    private boolean needsReset = false;
    private final PageProducer<T> generator;
    private long offset = 0L;
    private long limit = DEFAULT_PAGE_SIZE;


    public PagingSourceSpliterator(PageProducer<T> generator) {
        this.generator = generator;
    }

    public PagingSourceSpliterator(long pageSize, PageProducer<T> generator) {
        this.generator = generator;
        this.limit = pageSize;
    }


    @Override
    public boolean tryAdvance(Consumer<? super T> action) {

        if (hasAnotherElement()) {
            if (!results.hasNext()) {
                loadPageAndPrepareNextPaging();
            }
            if (results.hasNext()) {
                action.accept(results.next());
                return true;
            }
        }

        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        // if we know there's another page, go ahead and hand off whatever
        // remains of this spliterator as a new spliterator for other
        // threads to work on, and then mark that next time something is
        // requested from this spliterator it needs to be reset to the head
        // of the next page
        if (hasAnotherPage()) {
            Spliterator<T> other = result.getPage().spliterator();
            needsReset = true;
            return other;
        } else {
            return null;
        }

    }

    @Override
    public long estimateSize() {
        if(limit == 0) {
            return 0;
        }

        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return result.getTotalResults();
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | ORDERED | DISTINCT | NONNULL | SIZED | SUBSIZED;
    }

    private boolean hasAnotherElement() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (results.hasNext() || hasAnotherPage());
    }

    private boolean hasAnotherPage() {
        ensureStateIsUpToDateEnoughToAnswerInquiries();
        return isBound() && (result.getTotalResults() > offset);
    }

    private boolean isBound() {
        return Objects.nonNull(results) && Objects.nonNull(result);
    }

    private void ensureStateIsUpToDateEnoughToAnswerInquiries() {
        ensureBound();
        ensureResetIfNecessary();
    }

    private void ensureBound() {
        if (!isBound()) {
            loadPageAndPrepareNextPaging();
        }
    }

    private void ensureResetIfNecessary() {
        if(needsReset) {
            loadPageAndPrepareNextPaging();
            needsReset = false;
        }
    }

    private void loadPageAndPrepareNextPaging() {
        // keep track of the overall result so that we can reference the original list and total size
        this.result = generator.apply(offset, limit);

        // make sure that the iterator we use to traverse a single page removes
        // results from the underlying list as we go so that we can simply pass
        // off the list spliterator for the trySplit rather than constructing a
        // new kind of spliterator for what remains.
        this.results = new DelegatingIterator<T>(result.getPage().listIterator()) {
            @Override
            public T next() {
                T next = super.next();
                this.remove();
                return next;
            }
        };

        // update the paging for the next request and inquiries prior to the next request
        // we use the page of the actual result set instead of the limit in case the limit
        // was not respected exactly.
        this.offset += result.getPage().size();
    }

    public static class DelegatingIterator<T> implements Iterator<T> {

        private final Iterator<T> iterator;

        public DelegatingIterator(Iterator<T> iterator) {
            this.iterator = iterator;
        }


        @Override
        public boolean hasNext() {
            return iterator.hasNext();
        }

        @Override
        public T next() {
            return iterator.next();
        }

        @Override
        public void remove() {
            iterator.remove();
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            iterator.forEachRemaining(action);
        }
    }
}

Et la source de mon pages:

public interface PageProducer<T> extends BiFunction<Long, Long, Page<T>> {

}

Et une page:

public final class Page<T> {

    private long totalResults;
    private final List<T> page = new ArrayList<>();

    public long getTotalResults() {
        return totalResults;
    }

    public List<T> getPage() {
        return page;
    }

    public Page setTotalResults(long totalResults) {
        this.totalResults = totalResults;
        return this;
    }

    public Page setPage(List<T> results) {
        this.page.clear();
        this.page.addAll(results);
        return this;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Page)) {
            return false;
        }
        Page<?> page1 = (Page<?>) o;
        return totalResults == page1.totalResults && Objects.equals(page, page1.page);
    }

    @Override
    public int hashCode() {
        return Objects.hash(totalResults, page);
    }

}

Et un exemple d'obtention d'un flux avec une pagination "lente" pour les tests

private <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {

    PageProducer<T> producer = (offset, limit) -> {

        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        int beginIndex = offset.intValue();
        int endIndex = Math.min(offset.intValue() + limit.intValue(), things.size());
        return new Page<T>().setTotalResults(things.size())
                .setPage(things.subList(beginIndex, endIndex));
    };

    return StreamSupport.stream(new PagingSourceSpliterator<>(pageSize, producer), true);
}
22
demandé sur Holger 2016-06-30 19:05:10

2 réponses

La raison principale, votre spliterator ne vous rapproche pas de votre objectif, est qu'il essaie de diviser les pages, plutôt que l'espace de l'élément source. Si vous connaissez le nombre total d'éléments et avez une source permettant d'extraire une page via offset et limit, la forme la plus naturelle de spliterator est d'encapsuler une plage dans ces éléments, par exemple via offset et limit ou end. Ensuite, le Fractionnement signifie simplement diviser cette plage , en adaptant le décalage de votre spliterator à la division position et création d'un nouveau spliterator représentant le préfixe, de "Old offset" à la position de division.

Before splitting:
      this spliterator: offset=x, end=y
After splitting:
      this spliterator: offset=z, end=y
  returned spliterator: offset=x, end=z

x <= z <= y

Tandis que dans le meilleur des cas, z est exactement au milieu entre les x et y, l'équilibre se divise, mais dans notre cas, nous allons légèrement adapter à produire des ensembles multiples de la taille de la page.

Cette logique fonctionne sans avoir besoin de récupérer des pages, donc si vous reportez la récupération des pages jusqu'au moment, le framework veut commencer la traversée, c'est-à-dire après, le fractionnement, les opérations d'extraction peuvent s'exécuter en parallèle. Le plus grand obstacle est le fait que vous devez aller chercher la première page afin d'en apprendre davantage sur le nombre total d'éléments. La solution ci-dessous sépare cette première extraction du reste, simplifiant l'implémentation. Bien sûr, il doit transmettre le résultat de cette première récupération de page, qui sera consommée lors de la première traversée (dans le cas séquentiel) ou renvoyée en tant que premier préfixe séparé, en acceptant un scission déséquilibrée à ce stade, mais ne plus avoir à y faire face par la suite.

public class PagingSpliterator<T> implements Spliterator<T> {
    public interface PageFetcher<T> {
        List<T> fetchPage(long offset, long limit, LongConsumer totalSizeSink);
    }
    public static final long DEFAULT_PAGE_SIZE = 100;

    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor) {
        return paged(pageAccessor, DEFAULT_PAGE_SIZE, false);
    }
    public static <T> Stream<T> paged(PageFetcher<T> pageAccessor,
                                      long pageSize, boolean parallel) {
        if(pageSize<=0) throw new IllegalArgumentException();
        return StreamSupport.stream(() -> {
            PagingSpliterator<T> pgSp
                = new PagingSpliterator<>(pageAccessor, 0, 0, pageSize);
            pgSp.danglingFirstPage
                =spliterator(pageAccessor.fetchPage(0, pageSize, l -> pgSp.end=l));
            return pgSp;
        }, CHARACTERISTICS, parallel);
    }
    private static final int CHARACTERISTICS = IMMUTABLE|ORDERED|SIZED|SUBSIZED;

    private final PageFetcher<T> supplier;
    long start, end, pageSize;
    Spliterator<T> currentPage, danglingFirstPage;

    PagingSpliterator(PageFetcher<T> supplier,
            long start, long end, long pageSize) {
        this.supplier = supplier;
        this.start    = start;
        this.end      = end;
        this.pageSize = pageSize;
    }

    public boolean tryAdvance(Consumer<? super T> action) {
        for(;;) {
            if(ensurePage().tryAdvance(action)) return true;
            if(start>=end) return false;
            currentPage=null;
        }
    }
    public void forEachRemaining(Consumer<? super T> action) {
        do {
            ensurePage().forEachRemaining(action);
            currentPage=null;
        } while(start<end);
    }
    public Spliterator<T> trySplit() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        if(currentPage!=null)
            return currentPage.trySplit();
        if(end-start>pageSize) {
            long mid=(start+end)>>>1;
            mid=mid/pageSize*pageSize;
            if(mid==start) mid+=pageSize;
            return new PagingSpliterator<>(supplier, start, start=mid, pageSize);
        }
        return ensurePage().trySplit();
    }
    /**
     * Fetch data immediately before traversing or sub-page splitting.
     */
    private Spliterator<T> ensurePage() {
        if(danglingFirstPage!=null) {
            Spliterator<T> fp=danglingFirstPage;
            danglingFirstPage=null;
            currentPage=fp;
            start=fp.getExactSizeIfKnown();
            return fp;
        }
        Spliterator<T> sp = currentPage;
        if(sp==null) {
            if(start>=end) return Spliterators.emptySpliterator();
            sp = spliterator(supplier.fetchPage(
                                 start, Math.min(end-start, pageSize), l->{}));
            start += sp.getExactSizeIfKnown();
            currentPage=sp;
        }
        return sp;
    }
    /**
     * Ensure that the sub-spliterator provided by the List is compatible with
     * ours, i.e. is {@code SIZED | SUBSIZED}. For standard List implementations,
     * the spliterators are, so the costs of dumping into an intermediate array
     * in the other case is irrelevant.
     */
    private static <E> Spliterator<E> spliterator(List<E> list) {
        Spliterator<E> sp = list.spliterator();
        if((sp.characteristics()&(SIZED|SUBSIZED))!=(SIZED|SUBSIZED))
            sp=Spliterators.spliterator(
                StreamSupport.stream(sp, false).toArray(), IMMUTABLE | ORDERED);
        return sp;
    }
    public long estimateSize() {
        if(currentPage!=null) return currentPage.estimateSize();
        return end-start;
    }
    public int characteristics() {
        return CHARACTERISTICS;
    }
}

Il utilise une interface fonctionnelle PageFetcher spécialisée qui peut être implémentée en appelant la méthode accept du rappel avec la taille totale résultante et en renvoyant une liste d'éléments. Le spliterator de pagination déléguera simplement au spliterator de la liste pour la traversée et dans le cas où la concurrence est significativement plus élevée que le nombre de pages résultant, il peut même bénéficier de la division de ceux-ci séparateurs de page, ce qui implique que les listes d'accès aléatoire, comme ArrayList, sont le type de liste préféré ici.

Adapter votre exemple de code à

private static <T> Stream<T> asSlowPagedSource(long pageSize, List<T> things) {
    return PagingSpliterator.paged( (offset, limit, totalSizeSink) -> {
        totalSizeSink.accept(things.size());
        if(offset>things.size()) return Collections.emptyList();
        int beginIndex = (int)offset;
        assert beginIndex==offset;
        int endIndex = Math.min(beginIndex+(int)limit, things.size());
        System.out.printf("Page %6d-%6d:\t%s%n",
                          beginIndex, endIndex, Thread.currentThread());
        // artificial slowdown
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        return things.subList(beginIndex, endIndex);
    }, pageSize, true);
}

Vous pouvez le tester comme

List<Integer> samples=IntStream.range(0, 555_000).boxed().collect(Collectors.toList());
List<Integer> result =asSlowPagedSource(10_000, samples) .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();

Étant donné suffisamment de cœurs de processeur libres, il montrera comment les pages sont récupérées simultanément, donc non ordonnées, alors que le résultat sera correctement dans l'ordre de rencontre. Vous pouvez également tester la concurrence des sous-pages qui s'applique lorsqu'il y a moins de pages:

Set<Thread> threads=ConcurrentHashMap.newKeySet();
List<Integer> samples=IntStream.range(0, 1_000_000).boxed().collect(Collectors.toList());
List<Integer> result=asSlowPagedSource(500_000, samples)
    .peek(x -> threads.add(Thread.currentThread()))
    .collect(Collectors.toList());
if(!samples.equals(result))
    throw new AssertionError();
System.out.println("Concurrency: "+threads.size());
5
répondu Holger 2016-07-11 16:46:54

Https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html

D'après ma compréhension, la vitesse de la division vient de l'immuabilité. Plus la source est immuable, plus le traitement est rapide, car l'immuabilité permet mieux le traitement parallèle ou plutôt le fractionnement.

L'idée semble être d'aborder les changements, le cas échéant, à la source le mieux possible avant de la lier dans son ensemble (meilleur) ou en parties (généralement le cas et donc le vôtre et bien d'autres challenge) à spliterators.

Dans votre cas, cela pourrait signifier que les tailles de page ont été respectées en premier au lieu de:

//.. in case the limit was not respected exactly. this.offset += result.getPage().size();

Cela peut aussi signifier que l'alimentation du cours d'eau doit être préparée et non utilisée comme source directe.

Il y a un exemple à la fin du document de " comment un framework de calcul parallèle, tel que java.util.paquet de flux, utiliserait Spliterator dans un calcul parallèle "

Notez que c'est comment stream utiliserait spliterator et non comment spliterator utilise un flux comme source.

Il y a une méthode "compute" intéressante à la fin de l'exemple.

PS si jamais vous obtenez une classe PageSpliterator efficace Générique, assurez-vous d'en informer certains d'entre nous.

Acclamations.

0
répondu 2016-07-10 06:18:18