La configuration asynchrone du processeur au printemps pour une meilleure performance
j'ai un problème avec la création de processeur asynchrone dans Spring Batch.
Mon processeur devient ID
reader
et la création d'objet basé sur la réponse de SOAP
appel. Parfois pour 1 entrée (ID
) il doit y avoir par exemple 60-100 SOAP
appels et parfois seulement 1. J'ai essayé de faire du multithread step qu'il était en train de traiter.g 50 entrées à la fois mais c'était inutile parce que 49 threads ont fait leur travail en 1 seconde et ont été bloqués, attendant celle qui faisait 60-100 SOAP
appeler. Maintenant, j'utilise AsyncItemProcessor
+AsyncItemWriter
mais cette solution fonctionne lentement pour moi. Comme mon entrée (IDs
) est grand, environ 25k éléments lus à partir de DB je voudrais commencer ~50-100 entrées à la fois.
Voici ma configuration:
@Configuration
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DatabaseConfig databaseConfig;
@Value(value = "classpath:Categories.txt")
private Resource categories;
@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
}
@Bean
public Step orderStep1() throws Exception {
return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
@Bean
public ItemWriter asyncItemWriter() {
AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public ItemWriter<CategoryDailyResult> itemWriter(){
return new Writer();
}
@Bean
public ItemProcessor asyncItemProcessor() {
AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
return new Processor();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(50);
return taskExecutor;
}
@Bean(destroyMethod = "")
public ItemReader<Category> reader() throws Exception {
String query = "select c from Category c where not exists elements(c.children)";
JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
reader.setSaveState(false);
reader.setQueryString(query);
reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
reader.setPageSize(1);
return reader;
}
}
Comment puis-je booster mon application? Peut-être suis-je en train de faire quelque chose de mal? Tous les commentaires de bienvenue ;)
@Edit: Pour l'entrée des id: 1 à 100 je veux E. g 50 threads qui exécutent le processeur. Je tiens à ne pas bloquer les uns les autres: Thread1 processus entrée "1" pendant 2 minutes et à ce moment je veux Thread2 pour traiter entrée "2", "8", "64" qui sont petites et s'exécutent en quelques secondes.
@Edit2:
Mon but:
J'ai 25k Id dans la base de données, je les ai lues avec JpaPagingItemReader
et chaque ID est traité par processeur. Chaque élément est indépendant l'un de l'autre. Pour chaque pièce D'identité que je fais!--3--> appeler 0-100 fois en boucle et puis je crée L'objet que je passe à Writer
et sauvegarder dans la base de données. Comment obtenir la meilleure performance pour une telle tâche?
2 réponses
Vous devriez partager votre travail. Ajouter une étape partitionnée comme suit:
@Bean
public Step partitionedOrderStep1(Step orderStep1) {
return stepBuilder.get("partitionedOrderStep1")
.partitioner(orderStep1)
.partitioner("orderStep1", new SimplePartitioner())
.taskExecutor(taskExecutor())
.gridSize(10) //Number of concurrent partitions
.build();
}
alors utilisez cette étape dans votre définition de travail. Le. gridSize () call configure le nombre de partitions à exécuter simultanément. Si l'un de vos objets de lecture, de traitement ou D'écriture est stateful, vous devez l'annoter avec @StepScope.
@KCrookedHand: j'ai traité avec scénario similaire, j'ai dû lire quelques milliers et besoin d'appeler le service SOAP (j'ai injecté ceci dans itemReader) pour les critères de correspondance.
ma configuration ressemble à ci-dessous, en gros vous avez quelques options pour réaliser le traitement parallèle et deux d'entre elles sont le "partitionnement" et L'approche du "Serveur Client". J'ai choisi partitionnement parce que je vais avoir plus de contrôle sur le nombre de partitions dont j'ai besoin basé sur mon données.
Veuillez ThreadPoolTaskExecutor @MichaelMinella mentionné, ci-dessous, Étape-Exécution à l'aide de tasklet où il est applicable.
<batch:step id="notificationMapper">
<batch:partition partitioner="partitioner"
step="readXXXStep" />
</batch:step>
</batch:job>
<batch:step id="readXXXStep">
<batch:job ref="jobRef" job-launcher="jobLauncher"
job-parameters-extractor="jobParameterExtractor" />
</batch:step>
<batch:job id="jobRef">
<batch:step id="dummyStep" next="skippedItemsDecision">
<batch:tasklet ref="dummyTasklet"/>
<batch:listeners>
<batch:listener ref="stepListener" />
</batch:listeners>
</batch:step>
<batch:step id="xxx.readItems" next="xxx.then.finish">
<batch:tasklet>
<batch:chunk reader="xxxChunkReader" processor="chunkProcessor"
writer="itemWriter" commit-interval="100">
</batch:chunk>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="taskletListener" />
</batch:listeners>
</batch:step>
...