Comment traiter des lignes logiquement reliées après Readereader dans SpringBatch?

Scénario

pour simplifier, supposons que j'ai un lecteur D'Item qui me renvoie 25 lignes.

  1. Les 10 premières lignes appartiennent à l'étudiant Un

  2. les 5 suivants appartiennent à l'élève B

  3. et les 10 restants appartiennent à l'élève C

je veux les rassembler, logiquement dire par studentId et les aplatir pour finir avec une rangée par étudiant.

Problème

Si je comprends bien, le réglage de l'intervalle de validation à 5 procédez de la manière suivante:

  1. envoyer 5 lignes au processeur (qui les regroupera ou fera n'importe quelle logique commerciale à laquelle je le dirai).
  2. après le traitement écrira 5 lignes.
  3. puis il le refera pour les 5 prochaines rangées et ainsi de suite.

Si cela est vrai, alors pour les cinq prochaines je vais vérifier le déjà écrit les uns, les faire sortir les agréger à ceux que je suis en train de traiter et les écrire à nouveau.

personnellement, je ne fais pas ça.

  1. Quelle est la meilleure pratique pour gérer une situation comme celle-ci dans le Lot de printemps?

Autres

parfois j'ai l'impression qu'il est beaucoup plus facile d'écrire un programme principal JDBC printemps régulier et puis j'ai le contrôle total de ce que je veux faire. Cependant, je voulais profiter du référentiel d'emplois surveillance de l'état de la tâche, la capacité de redémarrer, skip, travail et écouteurs étape....

Mon Code De Lot De Printemps

Mon module-contexte.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"
                              processor="attendanceProcessor" 
                              writer="attendanceItemWriter" 
                              commit-interval="10" 
                 />

            </batch:tasklet>
        </batch:step>
    </batch:job> 

    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        </property> 
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 
    </bean> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
        </property> 
    </bean> 

</beans>

mes cours de soutien pour le lecteur.

A PreparedStatementSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);

    }

}

et un RowMapper

package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();
        dto.setStudentId(rs.getString(STUDENT_ID));
        dto.setStudentName(rs.getString(STUDENT_NAME));
        dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
        dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));

        return dto;
    }
}

Mon processeur

package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

        }else{
            map.put(new Integer(dto.getStudentId()), dto);
        }
        return map;
    }

}

Mes préoccupations de code ci-dessus

dans le processeur, je crée un HashMap rangées je vérifie si J'ai déjà cet élève dans la carte, si ce N'est pas là je l'ajoute. Si elle est déjà là, je prends l'il obtenir les valeurs qui m'intéressent et les ajouter avec la ligne que je suis en cours de traitement.

après cela, Spring Batch Framework écrit dans un fichier selon ma configuration

Ma question est comme suit:

  1. je ne veux pas aller à l'écrivain. Je veux traiter toutes les lignes restantes. Comment puis-je garder cette Carte J'ai créé en mémoire pour la prochaine série de lignes qui doivent passer par ce même Processeur? À chaque fois, une ligne est traitée via AttendanceProcessor la carte est initialisée. Dois-je mettre l'initialisation de la carte dans un bloc statique?
17
demandé sur Viriato 2012-01-12 19:22:09

5 réponses

j'ai toujours suivre ce modèle:

  1. je fais mon lecteur scope pour être "step", et dans @PostConstruct je vais chercher les résultats, et les mettre dans une carte
  2. dans le processeur, je convertis la collectionassociée en liste inscriptible, et d'envoyer l'écriture de la liste
  3. Dans ItemWriter, je persiste, l'écriture d'article(s), selon le cas
2
répondu sasi_personal 2013-07-22 13:05:36

parce que vous avez changé votre question j'ajoute une nouvelle réponse

si les étudiants sont commandés alors il n'y a pas besoin de liste/carte, vous pouvez utiliser exactement un objet studentObject sur le processeur pour garder le "courant" et agréger sur elle jusqu'à ce qu'il y ait un nouveau (lire: changement d'id)

si les étudiants ne sont pas commandés vous ne saurez jamais quand un étudiant spécifique est "terminé" et vous auriez à garder tous les étudiants dans une carte qui ne peut pas être écrite jusqu'à la fin de la lecture complète séquence

attention:

  • le processeur doit savoir quand le lecteur est épuisé
  • il est difficile de le faire fonctionner avec n'importe quel commettre de taux et de "id" concept si vous regroupez des éléments qui sont en quelque sorte identiques, le processeur ne peut tout simplement pas savoir si l'élément en cours de traitement est le dernier
  • fondamentalement, l'usecase est soit entièrement résolu au niveau du lecteur ou au niveau de l'auteur (Voir Autres réponse)
private SimpleItem currentItem;
private StepExecution stepExecution;

@Override
public SimpleItem process(SimpleItem newItem) throws Exception {
    SimpleItem returnItem = null;

    if (currentItem == null) {
        currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
    } else if (currentItem.getId() == newItem.getId()) {
        // aggregate somehow
        String value = currentItem.getValue() + newItem.getValue();
        currentItem.setValue(value);
    } else {
        // "clone"/copy currentItem
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
        // replace currentItem
        currentItem = newItem;
    }

    // reader exhausted?
    if(stepExecution.getExecutionContext().containsKey("readerExhausted")
            && (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
            && currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
    }

    return returnItem;
}
1
répondu Michael Pralow 2012-01-17 22:23:17

Dans mon application, j'ai créé un CollectingJdbcCursorItemReader qui s'étend de la norme JdbcCursorItemReader et effectue exactement ce dont vous avez besoin. En interne, il utilise mon CollectingRowMapper: une extension de la norme RowMapper qui correspond à plusieurs lignes reliées à un objet.

voici le code du ItemReader, le code de CollectingRowMapper interface, et un abrégé de la mise en œuvre de celle-ci est disponible dans une autre réponse de la mienne.

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;

/**
 * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
 * Like the superclass this reader is not thread-safe.
 * 
 * @author Pino Navato
 **/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private CollectingRowMapper<T> rowMapper;
    private boolean firstRead = true;


    /**
     * Accepts a {@link CollectingRowMapper} only.
     **/
    @Override
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (CollectingRowMapper<T>)rowMapper;
        super.setRowMapper(rowMapper);
     }


    /**
     * Read next row and map it to item.
     **/
    @Override
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        try {
            if (firstRead) {
                if (!rs.next()) {  //Subsequent calls to next() will be executed by rowMapper
                    return null;
                }
                firstRead = false;
            } else if (!rowMapper.hasNext()) {
                return null;
            }
            T item = readCursor(rs, getCurrentItemCount());
            return item;
        }
        catch (SQLException se) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
        }
    }

    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        setCurrentItemCount(rs.getRow());
        return result;
    }

}

Vous pouvez l'utiliser tout comme le classique JdbcCursorItemReader: la seule exigence est que vous en fournir un CollectingRowMapper au lieu du Classique RowMapper.

1
répondu Pino 2017-09-21 14:30:48

fondamentalement, vous parlez de traitement par lots avec changement D'IDs (1), où le lot doit garder une trace du changement

pour le printemps/spring-batch nous en parler:

  • ItemWriter qui vérifie la liste des éléments pour un changement d'id
  • avant le changement les éléments sont stockés dans un datastore temporaire (2) (Liste, Map, whatever), et ne sont pas
  • lorsque l'id change, le code d'activité agrégeant / aplatissant les éléments dans le datastore et un élément doit être écrit, maintenant le datastore peut être utilisé pour les éléments suivants avec le prochain id
  • ce concept a besoin d'un lecteur qui indique l'étape "je suis épuisé" pour bien vider le datastore temporaire à la fin des éléments (Fichier/Base de données)

voici un exemple de code brut et simple

@Override
public void write(List<? extends SimpleItem> items) throws Exception {

    // setup with first sharedId at startup
    if (currentId == null){
        currentId = items.get(0).getSharedId();
    }

    // check for change of sharedId in input
    // keep items in temporary dataStore until id change of input
    // call delegate if there is an id change or if the reader is exhausted
    for (SimpleItem item : items) {
        // already known sharedId, add to tempData
        if (item.getSharedId() == currentId) {
            tempData.add(item);
        } else {
            // or new sharedId, write tempData, empty it, keep new id
            // the delegate does the flattening/aggregating
            delegate.write(tempData);
            tempData.clear();
            currentId = item.getSharedId();
            tempData.add(item);
        }
    }

    // check if reader is exhausted, flush tempData
    if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
            && tempData.size() > 0) {
        delegate.write(tempData);
        // optional delegate.clear(); 
    }
}

(1) en supposant que les articles sont ordonnés par un ID (peut être composite aussi)

(2)une hashmap printemps bean pour le thread sécurité

0
répondu Michael Pralow 2012-01-16 10:40:16

utilisez Step Execution Listener et stockez les enregistrements comme map vers le StepExecutionContext , vous pouvez ensuite les grouper dans le rédacteur ou le rédacteur listener et l'écrire à la fois

0
répondu Sundar 2016-02-24 05:32:31