初识并行循环

并行循环主要用来处理数据并行的,如,同时对数组或列表中的多个数据执行相同的操作。

在C#编程中,我们使用并行类System.Threading.Tasks.Parallel提供的静态方法Parallel.ForParallel.ForEach来实现并行循环。从方法名可以看出,这两个方法是对常规循环forforeach的并行化。

简单用法

使用并行循环时需要传入循环范围(集合)和操作数据的委托Action<T>

Parallel.For(0, 100, i => { Console.WriteLine(i); });

Parallel.ForEach(Enumerable.Range(0, 100), i => { Console.WriteLine(i); });

使用场景

对于数据的处理需要耗费较长时间的循环适宜使用并行循环,利用多线程加快执行速度。

对于简单的迭代操作,且迭代范围较小,使用常规循环更好好,因为并行循环涉及到线程的创建、上下文切换和销毁,使用并行循环反而影响执行效率。

对于迭代操作简单但迭代范围很大的情况,我们可以对数据进行分区,再执行并行循环,减少线程数量。

循环结果

Parallel.ForParallel.ForEach方法的所有重载有着同样的返回值类型ParallelLoopResult,并行循环结果包含循环是否完成以及最低迭代次数两项信息。

下面的例子使用Parallel.ForEach展示了并行循环的结果。

ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>
{// 委托传入ParallelLoopState,用来控制循环执行
    Console.WriteLine(i + 1);
    Thread.Sleep(100);
    if (i == 30) // 此处设置循环停止的确切条件
    {
        loop.Break();
        //loop.Stop();
    }
});
Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");

值得一提的是,循环的Break()Stop()只能尽早地跳出或者停止循环,而不能立即停止。

取消循环操作

有时候,我们需要在中途取消循环操作,但又不知道确切条件是什么,比如用户触发的取消。这时候,可以利用循环的ParallelOptions传入一个CancellationToken,同时使用异常处理捕获OperationCanceledException以进行取消后的处理。下面是一个简单的例子。

/// <summary>
/// 取消通知者
/// </summary>
public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();

/// <summary>
/// 取消并行循环
/// </summary>
public static void CancelParallelLoopDemo()
{
    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
                i =>
                {
                    Console.WriteLine(i + 1);
                    Thread.Sleep(1000);
                });
        }
        catch (OperationCanceledException oce)
        {
            Console.WriteLine(oce.Message);
        }
    });
}
static void Main(string[] args)
{
    ParallelDemoCancelParallelLoopDemo();
    Thread.Sleep(3000);
    ParallelDemoCTSource.Cancel();

    Console.ReadKey();
}

循环异常收集

并行循环执行过程中,可以捕获并收集迭代操作引发的异常,循环结束时抛出一个AggregateException异常,并将收集到的异常赋给它的内部异常集合InnerExceptions。外部使用时,捕获AggregateException,即可进行并行循环的异常处理。

下面的例子模拟了并行循环的异常抛出、收集及处理的过程。

/// <summary>
/// 捕获循环异常
/// </summary>
public static void CaptureTheLoopExceptions()
{
    ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
    Parallel.ForEach(Enumerable.Range(0, 100), i =>
    {
        try
        {
            if (i % 10 == 0)
            {//模拟抛出异常
                throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
            }
            Console.WriteLine(i + 1);
            Thread.Sleep(100);
        }
        catch (Exception ex)
        {//捕获并收集异常
            exceptions.Enqueue(ex);
        }
    });

    if (!exceptions.IsEmpty)
    {// 方法内部可直接进行异常处理,若需外部处理,将收集到的循环异常抛出
        throw new AggregateException(exceptions);
    }
}

外部处理方式

try
{
    ParallelDemo.CaptureTheLoopExceptions();
}
catch (AggregateException aex)
{
    foreach (Exception ex in aex.InnerExceptions)
    {// 模拟异常处理
        Console.WriteLine(ex.Message);
    }
}

分区并行处理

当循环操作很简单,迭代范围很大的时候,ParallelLoop提供一种分区的方式来优化循环性能。下面的例子展示了分区循环的使用,同时也能比较几种循环方式的执行效率。

/// <summary>
/// 分区并行处理,顺便比较各种循环的效率
/// </summary>
/// <param name="rangeSize">迭代范围</param>
/// <param name="opDuration">操作耗时</param>
public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
{
    //PartationParallelLoopWithBuffer
    Stopwatch watch0 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
        i =>
        {//模拟操作
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch0.Stop();

    //PartationParallelLoopWithoutBuffer
    Stopwatch watch1 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
        i =>
        {//模拟操作
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch1.Stop();

    //NormalParallelLoop
    Stopwatch watch2 = Stopwatch.StartNew();
    Parallel.ForEach(Enumerable.Range(0, rangeSize),
        i =>
        {//模拟操作
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch2.Stop();

    //NormalLoop
    Stopwatch watch3 = Stopwatch.StartNew();
    foreach (int i in Enumerable.Range(0, rangeSize))
    {//模拟操作
        Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
        Thread.Sleep(opDuration);
    }
    watch2.Stop();
            
    Console.WriteLine();
    Console.WriteLine($"PartationParallelLoopWithBuffer    => {watch0.ElapsedMilliseconds}ms");
    Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalParallelLoop                 => {watch2.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalLoop                         => {watch3.ElapsedMilliseconds}ms");
}

在 I7-7700HQ + 16GB 配置 VS调试模式下得到下面一组测试结果。

Loop Condition PartationParallelLoop WithBuffer PartationParallelLoop WithoutBuffer Normal ParallelLoop Normal Loop
10000,1 10527 11799 11155 19434
10000,1 9513 11442 11048 19354
10000,1 9871 11391 14782 19154
100,1000 9107 5951 5081 100363
100,1000 9086 5974 5187 100162
100,1000 9208 5125 5255 100239
100,1 350 439 243 200
100,1 390 227 166 198
100,1 466 225 84 197

应该根据不同的应用场景选择合适的循环策略,具体如何选择,朋友们可自行体会~

版权声明:本文为chenbaoshun原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/chenbaoshun/p/10572639.html