Limiter le nombre de threads parallèles en C#
J'écris un programme C# pour générer et télécharger un demi-million de fichiers via FTP. Je veux traiter 4 fichiers en parallèle car la machine a 4 cœurs et la génération de fichiers prend beaucoup plus de temps. Est-il possible de convertir L'exemple Powershell suivant en C#? Ou y a-t-il un meilleur cadre tel que le cadre D'acteur en C# (comme F# MailboxProcessor)?
$maxConcurrentJobs = 3;
# Read the input and queue it up
$jobInput = get-content .input.txt
$queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) )
foreach($item in $jobInput)
{
$queue.Enqueue($item)
}
# Function that pops input off the queue and starts a job with it
function RunJobFromQueue
{
if( $queue.Count -gt 0)
{
$j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue()
Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null
}
}
# Start up to the max number of concurrent jobs
# Each job will take care of running the rest
for( $i = 0; $i -lt $maxConcurrentJobs; $i++ )
{
RunJobFromQueue
}
Mise à Jour:
La connexion au serveur FTP distant peut être lente donc, je veux limiter le traitement de téléchargement FTP.
5 réponses
En supposant que vous construisez ceci avec le TPL, vous pouvez définir les ParallelOptions .MaxDegreesOfParallelism à ce que vous voulez qu'il soit.
En Parallèle.Pour pour un exemple de code.
La bibliothèque parallèle de tâches est votre ami ici. Voir ce lien qui décrit ce qui est à votre disposition. Fondamentalement, framework 4 est livré avec ce qui optimise ces threads groupés essentiellement en arrière-plan au nombre de processeurs sur la machine en cours d'exécution.
Peut-être quelque chose du genre:
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
Puis dans votre boucle quelque chose comme:
Parallel.Invoke(options,
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"),
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
Si vous utilisez. Net 4.0, vous pouvez utiliser la bibliothèque parallèle
En supposant que vous parcourez le demi-million de fichiers, vous pouvez "parallèle" l'itération en utilisant un Foreach parallèle par exemple ou vous pouvez jeter un oeil à PLinq Voici une comparaison entre les deux
Essentiellement, vous allez vouloir créer une Action ou une tâche pour chaque fichier à télécharger, les mettre dans une liste, puis traiter cette liste, en limitant le nombre qui peut être traité en parallèle.
Mon article de blog montre comment le faire à la fois avec des tâches et avec des Actions, et fournit un exemple de projet que vous pouvez télécharger et exécuter pour voir les deux en action.
Avec Des Actions
Si vous utilisez des Actions, vous pouvez utiliser le parallèle. Net intégré.Appeler la fonction. Ici nous le limitons à courir au plus 4 threads en parallèle.
var listOfActions = new List<Action>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => UploadFile(localFile)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 4};
Parallel.Invoke(options, listOfActions.ToArray());
Cette option ne prend pas en charge async, et je suppose que vous êtes la fonction FileUpload sera, donc vous pouvez utiliser l'exemple de tâche ci-dessous.
Avec Des Tâches
Avec les tâches, il n'y a pas de fonction intégrée. Cependant, vous pouvez utiliser celui que je fournis sur mon blog.
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
Et puis en créant votre liste de tâches et en appelant la fonction pour les exécuter, avec par exemple un maximum de 4 simultanés à la fois, vous pourriez faire ce:
var listOfTasks = new List<Task>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await UploadFile(localFile)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);
En outre, comme cette méthode prend en charge async, elle ne bloquera pas le thread de L'interface utilisateur comme en utilisant Parallel.Invoquer ou en Parallèle.ForEach le ferait.
J'ai codé ci-dessous la technique où j'utilise BlockingCollection en tant que gestionnaire de nombre de threads. Il est assez simple à mettre en œuvre et gère le travail. Il accepte simplement les objets de tâche et ajoute une valeur entière à la liste de blocage, augmentant le nombre de threads en cours d'exécution de 1. Lorsque le thread se termine, il désqueue l'objet et libère le bloc lors de l'opération d'ajout pour les tâches à venir.
public class BlockingTaskQueue
{
private BlockingCollection<int> threadManager { get; set; } = null;
public bool IsWorking
{
get
{
return threadManager.Count > 0 ? true : false;
}
}
public BlockingTaskQueue(int maxThread)
{
threadManager = new BlockingCollection<int>(maxThread);
}
public async Task AddTask(Task task)
{
Task.Run(() =>
{
Run(task);
});
}
private bool Run(Task task)
{
try
{
threadManager.Add(1);
task.Start();
task.Wait();
return true;
}
catch (Exception ex)
{
return false;
}
finally
{
threadManager.Take();
}
}
}