Comment insérer des données parellel dans trois tables différentes
J'ai un stored procedure
qui va insérer l'essentiel des dossiers, maintenant est-il possible d'insérer des données dans 3 tables
en parallèle;
- Premier
table
insertion de 1 million d'enregistrements. - deuxième
table
insertion de 1,5 million d'enregistrements. - Troisième
table
insertion de 500 enregistrements
Selon mes connaissances, l'insertion de la procédure se passe l'une après l'autre.
Alors, comment puis-je implémenter le chargement en parallèle?
5 réponses
Les instructions
S'exécutent de manière synchrone dans un lot T-SQL. Pour exécuter plusieurs instructions de manière asynchrone et en parallèle à partir d'une procédure stockée, vous devez utiliser plusieurs connexions de base de données simultanées. Notez que la partie délicate avec l'exécution asynchrone est de déterminer non seulement quand toutes les tâches sont terminées, mais aussi si elles ont réussi ou échoué.
Méthode 1: paquet SSIS
Créez un package SSIS pour exécuter les 3 instructions SQL en parallèle. Dans SQL 2012 et versions ultérieures, exécutez le package en utilisant les procédures stockées du catalogue SSIS. Avant SQL 2012, vous devrez créer un travail D'Agent SQL pour le package et le lancer avec sp_start_job.
Vous devez vérifier l'état d'exécution SSIS ou L'état de la tâche SQL Agent pour déterminer l'achèvement et le résultat du succès/échec.
Méthode 2: Powershell et agent SQL
Exécute une tâche D'Agent SQL qui exécute un script Powershell qui exécute les requêtes en parallèle à L'aide de L'arrière-plan Powershell emplois (commande Start-Job). Le script peut renvoyer un code de sortie, zéro pour le succès et non-zéro pour l'échec, de sorte que SQL Agent peut déterminer s'il a réussi. Vérifiez L'État du travail de L'Agent SQL pour déterminer l'achèvement et le résultat du succès/échec.
Méthode 3: plusieurs tâches D'Agent SQL
Exécute simultanément plusieurs tâches D'Agent SQL, chacune avec une étape de tâche T-SQL contenant le script d'importation. Vérifiez L'État de chaque tâche SQL Agent pour déterminer l'achèvement et le succès / échec résultat.
Méthode 4: Courtier De Services Utilisez un processus activé par la file d'attente pour exécuter les scripts d'importation en parallèle. Cela peut être obtus si vous n'avez pas utilisé Service broker avant et il est important de suivre les modèles vérifiés. J'ai inclus un exemple pour vous aider à démarrer (remplacez THROW par RAISERROR pour pre-SQL 2012). Service Broker doit être activé dans la base de données, ce qui est activé par défaut mais désactivé après une restauration ou une attache.
USE YourDatabase;
Go
--create proc that will be automatically executed (activated) when requests are waiting
CREATE PROC dbo.ExecuteTSqlTask
AS
SET NOCOUNT ON;
DECLARE
@TSqlJobConversationHandle uniqueidentifier = NEWID()
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlScript nvarchar(MAX)
, @TSqlTaskName sysname
, @RowsAffected int
, @message_type_name sysname;
WHILE 1 = 1
BEGIN
--get the next task to execute
WAITFOR (
RECEIVE TOP (1)
@TSqlJobConversationHandle = conversation_handle
, @TSqlExecutionRequestMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlExecutionQueue
), TIMEOUT 1000;
IF @@ROWCOUNT = 0
BEGIN
--no work to do - exit
BREAK;
END;
IF @message_type_name = N'TSqlExecutionRequest'
BEGIN
--get task name and script
SELECT
@TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname')
, @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)');
--execute script
BEGIN TRY
EXEC sp_executesql @TSqlScript;
SET @RowsAffected = @@ROWCOUNT;
SET @TSqlExecutionResult = 'Completed';
SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected';
END TRY
BEGIN CATCH
SET @TSqlExecutionResult = 'Erred';
SET @TSqlExecutionResultDetails =
'Msg ' + CAST(ERROR_NUMBER() AS varchar(10))
+ ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2))
+ ', State ' + CAST(ERROR_STATE() AS varchar(10))
+ ', Line ' + CAST(ERROR_LINE() AS varchar(10))
+ ': ' + ERROR_MESSAGE();
END CATCH;
--send execution result back to initiator
SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />';
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] ');
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] ');
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] ');
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionResult
(@TSqlExecutionResultMessage);
END
ELSE
BEGIN
IF @message_type_name = N'TSqlJobComplete'
BEGIN
--service has ended conversation so we're not going to get any more execution requests
END CONVERSATION @TSqlJobConversationHandle;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask';
RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name);
END;
END;
END;
GO
CREATE QUEUE dbo.TSqlResultQueue;
CREATE QUEUE dbo.TSqlExecutionQueue
WITH STATUS=ON,
ACTIVATION (
STATUS = ON
, PROCEDURE_NAME = dbo.ExecuteTSqlTask
, MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances
, EXECUTE AS OWNER
);
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT TSqlExecutionContract (
TSqlExecutionRequest SENT BY INITIATOR
, TSqlJobComplete SENT BY INITIATOR
, TSqlExecutionResult SENT BY TARGET
);
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]);
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]);
GO
CREATE PROC dbo.ExecuteParallelImportScripts
AS
SET NOCOUNT ON;
DECLARE
@TSqlJobConversationHandle uniqueidentifier
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlTaskName sysname
, @CompletedCount int = 0
, @ErredCount int = 0
, @message_type_name sysname;
DECLARE @TsqlTask TABLE(
TSqlTaskName sysname NOT NULL PRIMARY KEY
, TSqlScript nvarchar(MAX) NOT NULL
);
BEGIN TRY
--insert a row for each import task
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;');
--start a conversation for this import process
BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle
FROM SERVICE TSqlJobService
TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE'
ON CONTRACT TSqlExecutionContract
WITH ENCRYPTION = OFF;
--send import tasks to executor service for parallel execution
DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR
SELECT (SELECT TSqlTaskName, TSqlScript
FROM @TsqlTask AS task
WHERE task.TSqlTaskName = job.TSqlTaskName
FOR XML PATH(''), TYPE) AS TSqlExecutionRequest
FROM @TsqlTask AS job;
OPEN JobTasks;
WHILE 1 = 1
BEGIN
FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage;
IF @@FETCH_STATUS = -1 BREAK;
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionRequest
(@TSqlExecutionRequestMessage);
END;
CLOSE JobTasks;
DEALLOCATE JobTasks;
--get each parallel task execution result until all are complete
WHILE 1 = 1
BEGIN
--get next task result
WAITFOR (
RECEIVE TOP (1)
@TSqlExecutionResultMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlResultQueue
WHERE conversation_handle = @TSqlJobConversationHandle
), TIMEOUT 1000;
IF @@ROWCOUNT <> 0
BEGIN
IF @message_type_name = N'TSqlExecutionResult'
BEGIN
--get result of import script execution
SELECT
@TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname')
, @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)')
, @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N'');
RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT;
IF @TSqlExecutionResult = 'Completed'
BEGIN
SET @CompletedCount += 1;
END
ELSE
BEGIN
SET @ErredCount += 1;
END;
--remove task from tracking table after completion
DELETE FROM @TSqlTask
WHERE TSqlTaskName = @TSqlTaskName;
IF NOT EXISTS(SELECT 1 FROM @TsqlTask)
BEGIN
--all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlJobComplete;
END
END
ELSE
BEGIN
IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
--executor service has ended conversation so we're done
END CONVERSATION @TSqlJobConversationHandle;
BREAK;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts';
RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name);
END;
END
END;
END;
RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount);
END TRY
BEGIN CATCH
THROW;
END CATCH;
GO
--execute import scripts in parallel
EXEC dbo.ExecuteParallelImportScripts;
GO
Vous pouvez essayer de créer trois tâches et exécuter des scripts d'insertion en parallèle comme ci-dessous:
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job1',
@enabled=1,
@description=N'No description available.',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into First Table',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N'TSQL',
@command=N'--Insert script for first table',
@database_name=N'Test',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job2',
@enabled=1,
@description=N'No description available.',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into second Table',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N'TSQL',
@command=N'--Insert script for second table',
@database_name=N'Test',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO
DECLARE @jobId BINARY(16)
EXEC msdb.dbo.sp_add_job @job_name=N'Job3',
@enabled=1,
@description=N'No description available.',
@job_id = @jobId OUTPUT
EXEC msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Insert into Third Table',
@step_id=1,
@cmdexec_success_code=0,
@on_success_action=1,
@on_success_step_id=0,
@on_fail_action=2,
@on_fail_step_id=0,
@retry_attempts=0,
@retry_interval=0,
@os_run_priority=0, @subsystem=N'TSQL',
@command=N'--Insert script for third table',
@database_name=N'Test',
@flags=0
EXEC msdb.dbo.sp_add_jobserver @job_id = @jobId, @server_name = N'(local)'
GO
EXEC msdb.dbo.sp_start_job N'Job1' ; --All will execute in parallel
EXEC msdb.dbo.sp_start_job N'Job2' ;
EXEC msdb.dbo.sp_start_job N'Job3' ;
En supposant que vous souhaitez avoir la même valeur de date d'insertion pour toutes les insertions, définissez un paramètre de date défini sur la date actuelle comme indiqué.
DECLARE @InsertDate as date
SET @InsertDate = GetDate()
Ensuite, passez le paramètre insert date dans votre procédure insert stored et mettez à jour cette procédure stockée en conséquence pour utiliser cette entrée. Cela garantira que la même valeur de date d'insertion sera utilisée pour toutes les insertions.
EXEC dbo.InsertTables123 @p1 = @InsertDate
Le paramètre d'entrée @ InsertDate peut également être attribué manuellement si autre chose que la date actuelle en cas de besoin.
Pour votre procédure, je suppose que vous avez tableName et file location comme paramètres.
Si vous avez un fichier volumineux qui a 3 millions d'enregistrements, vous devez diviser le fichier en 3 petits fichiers (si vous connaissez une autre langue que sql), après cela, vous pouvez ouvrir 3 consoles de ligne de commande SQL servers, et appeler la procédure dans chaque console. Cela rendra l'insertion parallèle. Ou vous connaissez d'autres langages de programmation, vous pouvez utiliser plusieurs threads d'appeler le procédure.
La structure et le contenu des trois tables sont-ils identiques? Si c'est le cas, utilisez la réplication transactionnelle/Fusion
Vous pouvez également créer un déclencheur sur la première table à insérer dans la deuxième et la troisième table