Comment insérer des données parellel dans trois tables différentes

J'ai un stored procedure qui va insérer l'essentiel des dossiers, maintenant est-il possible d'insérer des données dans 3 tables en parallèle;

  • Premier table insertion de 1 million d'enregistrements.
  • deuxième table insertion de 1,5 million d'enregistrements.
  • Troisième table insertion de 500 enregistrements

Selon mes connaissances, l'insertion de la procédure se passe l'une après l'autre.

Alors, comment puis-je implémenter le chargement en parallèle?

23
demandé sur Chanukya 2016-09-22 09:56:05

5 réponses

Les instructions

S'exécutent de manière synchrone dans un lot T-SQL. Pour exécuter plusieurs instructions de manière asynchrone et en parallèle à partir d'une procédure stockée, vous devez utiliser plusieurs connexions de base de données simultanées. Notez que la partie délicate avec l'exécution asynchrone est de déterminer non seulement quand toutes les tâches sont terminées, mais aussi si elles ont réussi ou échoué.

Méthode 1: paquet SSIS

Créez un package SSIS pour exécuter les 3 instructions SQL en parallèle. Dans SQL 2012 et versions ultérieures, exécutez le package en utilisant les procédures stockées du catalogue SSIS. Avant SQL 2012, vous devrez créer un travail D'Agent SQL pour le package et le lancer avec sp_start_job.

Vous devez vérifier l'état d'exécution SSIS ou L'état de la tâche SQL Agent pour déterminer l'achèvement et le résultat du succès/échec.

Méthode 2: Powershell et agent SQL

Exécute une tâche D'Agent SQL qui exécute un script Powershell qui exécute les requêtes en parallèle à L'aide de L'arrière-plan Powershell emplois (commande Start-Job). Le script peut renvoyer un code de sortie, zéro pour le succès et non-zéro pour l'échec, de sorte que SQL Agent peut déterminer s'il a réussi. Vérifiez L'État du travail de L'Agent SQL pour déterminer l'achèvement et le résultat du succès/échec.

Méthode 3: plusieurs tâches D'Agent SQL

Exécute simultanément plusieurs tâches D'Agent SQL, chacune avec une étape de tâche T-SQL contenant le script d'importation. Vérifiez L'État de chaque tâche SQL Agent pour déterminer l'achèvement et le succès / échec résultat.

Méthode 4: Courtier De Services Utilisez un processus activé par la file d'attente pour exécuter les scripts d'importation en parallèle. Cela peut être obtus si vous n'avez pas utilisé Service broker avant et il est important de suivre les modèles vérifiés. J'ai inclus un exemple pour vous aider à démarrer (remplacez THROW par RAISERROR pour pre-SQL 2012). Service Broker doit être activé dans la base de données, ce qui est activé par défaut mais désactivé après une restauration ou une attache.

USE YourDatabase;
Go

--create proc that will be automatically executed (activated) when requests are waiting
CREATE PROC dbo.ExecuteTSqlTask
AS
SET NOCOUNT ON;

DECLARE
      @TSqlJobConversationHandle uniqueidentifier = NEWID()
    , @TSqlExecutionRequestMessage xml
    , @TSqlExecutionResultMessage xml
    , @TSqlExecutionResult varchar(10)
    , @TSqlExecutionResultDetails nvarchar(MAX)
    , @TSqlScript nvarchar(MAX)
    , @TSqlTaskName sysname
    , @RowsAffected int
    , @message_type_name sysname;

WHILE 1 = 1
BEGIN

    --get the next task to execute
    WAITFOR (
        RECEIVE TOP (1)
              @TSqlJobConversationHandle = conversation_handle
            , @TSqlExecutionRequestMessage = CAST(message_body AS xml)
            , @message_type_name = message_type_name
        FROM dbo.TSqlExecutionQueue
        ), TIMEOUT 1000;

    IF @@ROWCOUNT = 0
    BEGIN
        --no work to do - exit
        BREAK;
    END;

    IF @message_type_name = N'TSqlExecutionRequest'
    BEGIN

        --get task name and script
        SELECT
              @TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname')
            , @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)');

        --execute script
        BEGIN TRY
            EXEC sp_executesql @TSqlScript;
            SET @RowsAffected = @@ROWCOUNT;
            SET @TSqlExecutionResult = 'Completed';
            SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected';
        END TRY
        BEGIN CATCH
            SET @TSqlExecutionResult = 'Erred';
            SET @TSqlExecutionResultDetails = 
                  'Msg ' + CAST(ERROR_NUMBER() AS varchar(10))
                + ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2))
                + ', State ' + CAST(ERROR_STATE() AS varchar(10))
                + ', Line ' + CAST(ERROR_LINE() AS varchar(10))
                + ': ' + ERROR_MESSAGE();
        END CATCH;

        --send execution result back to initiator
        SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />';
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] ');
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] ');
        SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] ');
        SEND ON CONVERSATION @TSqlJobConversationHandle
            MESSAGE TYPE TSqlExecutionResult
            (@TSqlExecutionResultMessage);

    END
    ELSE
    BEGIN
        IF @message_type_name = N'TSqlJobComplete'
        BEGIN
            --service has ended conversation so we're not going to get any more execution requests
            END CONVERSATION @TSqlJobConversationHandle;
        END
        ELSE
        BEGIN
            END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask';
            RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name);
        END;
    END;
END;
GO

CREATE QUEUE dbo.TSqlResultQueue;
CREATE QUEUE dbo.TSqlExecutionQueue
    WITH STATUS=ON,
    ACTIVATION (
          STATUS = ON
        , PROCEDURE_NAME = dbo.ExecuteTSqlTask
        , MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances 
        , EXECUTE AS OWNER
        );
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT TSqlExecutionContract (
      TSqlExecutionRequest SENT BY INITIATOR
    , TSqlJobComplete SENT BY INITIATOR
    , TSqlExecutionResult SENT BY TARGET
    );
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]);
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]);
GO

CREATE PROC dbo.ExecuteParallelImportScripts
AS
SET NOCOUNT ON;

DECLARE
      @TSqlJobConversationHandle uniqueidentifier
    , @TSqlExecutionRequestMessage xml
    , @TSqlExecutionResultMessage xml
    , @TSqlExecutionResult varchar(10)
    , @TSqlExecutionResultDetails nvarchar(MAX)
    , @TSqlTaskName sysname
    , @CompletedCount int = 0
    , @ErredCount int = 0
    , @message_type_name sysname;

DECLARE @TsqlTask TABLE(
      TSqlTaskName sysname NOT NULL PRIMARY KEY 
    , TSqlScript nvarchar(MAX) NOT NULL
    );

BEGIN TRY

    --insert a row for each import task
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;');
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;');
    INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript) 
        VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;');

    --start a conversation for this import process
    BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle
        FROM SERVICE TSqlJobService
        TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE'
        ON CONTRACT TSqlExecutionContract
        WITH ENCRYPTION = OFF;

    --send import tasks to executor service for parallel execution
    DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR
        SELECT (SELECT TSqlTaskName, TSqlScript
            FROM @TsqlTask AS task 
            WHERE task.TSqlTaskName = job.TSqlTaskName
            FOR XML PATH(''), TYPE) AS TSqlExecutionRequest
        FROM @TsqlTask AS job;
    OPEN JobTasks;
    WHILE 1 = 1
    BEGIN
        FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage;
        IF @@FETCH_STATUS = -1 BREAK;
        SEND ON CONVERSATION @TSqlJobConversationHandle
            MESSAGE TYPE TSqlExecutionRequest
            (@TSqlExecutionRequestMessage);
    END;
    CLOSE JobTasks;
    DEALLOCATE JobTasks;

    --get each parallel task execution result until all are complete
    WHILE 1 = 1
    BEGIN

        --get next task result
        WAITFOR (
            RECEIVE TOP (1)
                  @TSqlExecutionResultMessage = CAST(message_body AS xml)
                , @message_type_name = message_type_name
            FROM dbo.TSqlResultQueue
            WHERE conversation_handle = @TSqlJobConversationHandle
            ), TIMEOUT 1000;

        IF @@ROWCOUNT <> 0
        BEGIN

            IF @message_type_name = N'TSqlExecutionResult'
            BEGIN

                --get result of import script execution
                SELECT
                      @TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname')
                    , @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)')
                    , @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N'');
                RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT;
                IF @TSqlExecutionResult = 'Completed'
                BEGIN
                    SET @CompletedCount += 1;
                END
                ELSE
                BEGIN
                    SET @ErredCount += 1;
                END;

                --remove task from tracking table after completion
                DELETE FROM @TSqlTask
                WHERE TSqlTaskName = @TSqlTaskName;

                IF NOT EXISTS(SELECT 1 FROM @TsqlTask)
                BEGIN
                    --all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation
                    SEND ON CONVERSATION @TSqlJobConversationHandle
                        MESSAGE TYPE TSqlJobComplete;
                END
            END
            ELSE
            BEGIN
                IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
                BEGIN
                    --executor service has ended conversation so we're done
                    END CONVERSATION @TSqlJobConversationHandle;
                    BREAK;
                END
                ELSE
                BEGIN
                    END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts';
                    RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name);
                END;
            END
        END;
    END;
    RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount);
END TRY
BEGIN CATCH
    THROW;
END CATCH;
GO

--execute import scripts in parallel
EXEC dbo.ExecuteParallelImportScripts;
GO
16
répondu Dan Guzman 2016-10-19 10:57:45

Vous pouvez essayer de créer trois tâches et exécuter des scripts d'insertion en parallèle comme ci-dessous:

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job1', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into First Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for first table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job2', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into second Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for second table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job3', 
        @enabled=1,  
        @description=N'No description available.', 
        @job_id = @jobId OUTPUT

EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into Third Table', 
        @step_id=1, 
        @cmdexec_success_code=0, 
        @on_success_action=1, 
        @on_success_step_id=0, 
        @on_fail_action=2, 
        @on_fail_step_id=0, 
        @retry_attempts=0, 
        @retry_interval=0, 
        @os_run_priority=0, @subsystem=N'TSQL', 
        @command=N'--Insert script for third table', 
        @database_name=N'Test', 
        @flags=0

EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO

EXEC msdb.dbo.sp_start_job N'Job1' ; --All will execute in parallel
EXEC msdb.dbo.sp_start_job N'Job2' ;
EXEC msdb.dbo.sp_start_job N'Job3' ;
9
répondu Kannan Kandasamy 2016-10-14 16:04:28

En supposant que vous souhaitez avoir la même valeur de date d'insertion pour toutes les insertions, définissez un paramètre de date défini sur la date actuelle comme indiqué.

DECLARE @InsertDate as date
SET @InsertDate = GetDate()

Ensuite, passez le paramètre insert date dans votre procédure insert stored et mettez à jour cette procédure stockée en conséquence pour utiliser cette entrée. Cela garantira que la même valeur de date d'insertion sera utilisée pour toutes les insertions.

EXEC dbo.InsertTables123 @p1 = @InsertDate

Le paramètre d'entrée @ InsertDate peut également être attribué manuellement si autre chose que la date actuelle en cas de besoin.

2
répondu JohnH 2016-10-13 14:10:22

Pour votre procédure, je suppose que vous avez tableName et file location comme paramètres.

Si vous avez un fichier volumineux qui a 3 millions d'enregistrements, vous devez diviser le fichier en 3 petits fichiers (si vous connaissez une autre langue que sql), après cela, vous pouvez ouvrir 3 consoles de ligne de commande SQL servers, et appeler la procédure dans chaque console. Cela rendra l'insertion parallèle. Ou vous connaissez d'autres langages de programmation, vous pouvez utiliser plusieurs threads d'appeler le procédure.

1
répondu Robin 2016-10-19 20:58:16

La structure et le contenu des trois tables sont-ils identiques? Si c'est le cas, utilisez la réplication transactionnelle/Fusion

Vous pouvez également créer un déclencheur sur la première table à insérer dans la deuxième et la troisième table

-3
répondu Marinus 2016-09-22 10:39:25