La configuration asynchrone du processeur au printemps pour une meilleure performance

j'ai un problème avec la création de processeur asynchrone dans Spring Batch. Mon processeur devient IDreader et la création d'objet basé sur la réponse de SOAP appel. Parfois pour 1 entrée (ID) il doit y avoir par exemple 60-100 SOAP appels et parfois seulement 1. J'ai essayé de faire du multithread step qu'il était en train de traiter.g 50 entrées à la fois mais c'était inutile parce que 49 threads ont fait leur travail en 1 seconde et ont été bloqués, attendant celle qui faisait 60-100 SOAP appeler. Maintenant, j'utilise AsyncItemProcessor+AsyncItemWriter mais cette solution fonctionne lentement pour moi. Comme mon entrée (IDs) est grand, environ 25k éléments lus à partir de DB je voudrais commencer ~50-100 entrées à la fois.

Voici ma configuration:

@Configuration
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DatabaseConfig databaseConfig;
    @Value(value = "classpath:Categories.txt")
    private Resource categories;

    @Bean
    public Job processJob() throws Exception {
        return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
    }

    @Bean
    public Step orderStep1() throws Exception {
        return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }


    @Bean
    public ItemWriter asyncItemWriter() {
        AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<CategoryDailyResult> itemWriter(){
        return new Writer();
    }

    @Bean
    public ItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
        return new Processor();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(50);
        return taskExecutor;
    }

    @Bean(destroyMethod = "")
    public ItemReader<Category> reader() throws Exception {
        String query = "select c from Category c where not exists elements(c.children)";

        JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
        reader.setSaveState(false);
        reader.setQueryString(query);
        reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
        reader.setPageSize(1);

        return reader;
    }
}

Comment puis-je booster mon application? Peut-être suis-je en train de faire quelque chose de mal? Tous les commentaires de bienvenue ;)

@Edit: Pour l'entrée des id: 1 à 100 je veux E. g 50 threads qui exécutent le processeur. Je tiens à ne pas bloquer les uns les autres: Thread1 processus entrée "1" pendant 2 minutes et à ce moment je veux Thread2 pour traiter entrée "2", "8", "64" qui sont petites et s'exécutent en quelques secondes.

@Edit2: Mon but: J'ai 25k Id dans la base de données, je les ai lues avec JpaPagingItemReader et chaque ID est traité par processeur. Chaque élément est indépendant l'un de l'autre. Pour chaque pièce D'identité que je fais!--3--> appeler 0-100 fois en boucle et puis je crée L'objet que je passe à Writer et sauvegarder dans la base de données. Comment obtenir la meilleure performance pour une telle tâche?

14
demandé sur crooked 2017-08-18 17:26:33

2 réponses

Vous devriez partager votre travail. Ajouter une étape partitionnée comme suit:

@Bean
public Step partitionedOrderStep1(Step orderStep1) {
    return stepBuilder.get("partitionedOrderStep1")
            .partitioner(orderStep1)
            .partitioner("orderStep1", new SimplePartitioner())
            .taskExecutor(taskExecutor())
            .gridSize(10)  //Number of concurrent partitions
            .build();
}

alors utilisez cette étape dans votre définition de travail. Le. gridSize () call configure le nombre de partitions à exécuter simultanément. Si l'un de vos objets de lecture, de traitement ou D'écriture est stateful, vous devez l'annoter avec @StepScope.

1
répondu Joe Chiavaroli 2017-11-30 21:59:38

@KCrookedHand: j'ai traité avec scénario similaire, j'ai dû lire quelques milliers et besoin d'appeler le service SOAP (j'ai injecté ceci dans itemReader) pour les critères de correspondance.

ma configuration ressemble à ci-dessous, en gros vous avez quelques options pour réaliser le traitement parallèle et deux d'entre elles sont le "partitionnement" et L'approche du "Serveur Client". J'ai choisi partitionnement parce que je vais avoir plus de contrôle sur le nombre de partitions dont j'ai besoin basé sur mon données.

Veuillez ThreadPoolTaskExecutor @MichaelMinella mentionné, ci-dessous, Étape-Exécution à l'aide de tasklet où il est applicable.

<batch:step id="notificationMapper">
            <batch:partition partitioner="partitioner"
                step="readXXXStep" />
        </batch:step>
    </batch:job>


    <batch:step id="readXXXStep">
        <batch:job ref="jobRef" job-launcher="jobLauncher"
            job-parameters-extractor="jobParameterExtractor" />
    </batch:step>

    <batch:job id="jobRef">

        <batch:step id="dummyStep" next="skippedItemsDecision">
            <batch:tasklet ref="dummyTasklet"/>
            <batch:listeners>
                <batch:listener ref="stepListener" />
            </batch:listeners>
        </batch:step>

        <batch:step id="xxx.readItems" next="xxx.then.finish">
            <batch:tasklet>
                <batch:chunk reader="xxxChunkReader" processor="chunkProcessor"
                    writer="itemWriter" commit-interval="100">
                </batch:chunk>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="taskletListener" />
            </batch:listeners>
        </batch:step>

        ...
0
répondu Ashok Gudise 2017-09-26 19:13:07