Parallel类是对线程的抽象,提供数据与任务的并行性。类定义了静态方法For和ForEach,使用多个任务来完成多个作业。Parallel.For和Parallel.ForEach方法在每次迭代的时候调用相同的代码,而Parallel.Invoke()方法允许同时调用不同的方法。Parallel.ForEach()方法用于数据的并行性,Parallel.Invoke()方法用于任务的并行性。

1、For()方法

  For()方法用于多次执行一个任务,可以并行运行迭代,但迭代的顺序并没指定。For()方法前两个参数为定义循环的开始和结束,第三个参数为Action<int>委托。方法的返回值是ParallelLoopResult结构,它提供了是否结束的信息。如以下循环方法,不能保证输出顺序: 

static void ParallelFor()
{
    ParallelLoopResult result =
        Parallel.For(0, 10, async i =>
            {
                Console.WriteLine("{0}, task: {1}, thread: {2}", i,
                   Task.CurrentId, Thread.CurrentThread.ManagedThreadId);

                await Task.Delay(10);//异步方法,用于释放线程供其他任务使用。完成后,可能看不到方法的输出,因为主(前台线)程结束,所有的后台线程也将结束
                Console.WriteLine("{0}, task: {1}, thread: {2}", i, Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
            });
    Console.WriteLine("Is completed: {0}", result.IsCompleted);
}

  异步功能虽然方便,但是知道后台发生了什么仍然重要,必须留意。

提前停止For()方法

  可以根据条件提前停止For()方法,而不必完成全部的迭代。,传入参数ParallelLoopState的对象,调用Break()方法或者Stop()方法。如调用Break()方法,当迭代值大于15的时候中断,但其他任务可以同时运行,有其他值的任务也可以运行。利用LowestBreakIteration属性可以忽略其他任务的结果:

static void ParallelFor()
{
    ParallelLoopResult result = Parallel.For(10, 40, (int i, ParallelLoopState pls) =>
         {
             Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
             Thread.Sleep(10);
             if (i > 15)
                 pls.Break();
         });
    Console.WriteLine("Is completed: {0}", result.IsCompleted);
    if (!result.IsCompleted)
        Console.WriteLine("lowest break iteration: {0}", result.LowestBreakIteration);
}

  For()方法可以使用几个线程执行循环。如果要对每个线程进行初始化,就需要使用到For<TLocal>(int, int, Func<TLocal>, Func<int, ParallelLoopState, TLocal, TLocal> , Action<TLocal>)方法。

  • 前两个参数是对应的循环起始和终止条件;
  • 第二个参数类型是Func<TLocal>,返回一个值,传递给第三个参数。
  • 第三个参数类型是Func<int, ParallelLoopState, TLocal, TLocal>,是循环体的委托,其内部的第一个参数是循环迭代,内部第二个参数允许停止迭代,内部第三个参数用于接收For()方法的前一个参数的返回值。循环体应当返回与For()循环泛型类型一致的值。
  • 第四个参数是指定的一个委托,用于执行相关后续操作。
static void ParallelFor()
{
    Parallel.For<string>(0, 20, () =>
      {
          // invoked once for each thread
          Console.WriteLine("init thread {0}, task {1}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
          return String.Format("t{0}", Thread.CurrentThread.ManagedThreadId);
      },
      (i, pls, str1) =>
      {
          // invoked for each member
          Console.WriteLine("body i {0} str1 {1} thread {2} task {3}", i, str1, Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
          Thread.Sleep(10);
          return String.Format("i {0}", i);
      },
      (str1) =>
      {
          // final action on each thread
          Console.WriteLine("finally {0}", str1);
      });
}

2、使用ForEach()方法循环

  ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但是以异步方式遍历,没有确定的顺序。如果要中断循环,同样可以采用ParallelLoopState参数。ForEach<TSource>有许多泛型的重载方法。

static void ParallelForeach()
{
    string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve" };

    ParallelLoopResult result = Parallel.ForEach<string>(data, s =>
             {
                 Console.WriteLine(s);
             });
    Parallel.ForEach<string>(data, (s, pls, l) =>
    {
        Console.WriteLine("{0} {1}", s, l);
    });
}

3、调用多个方法

  如果有多个任务并行,可以使用Parallel.Invoke()方法,它提供任务的并行性模式:

static void ParallelInvoke()
{
    Parallel.Invoke(Foo, Bar);
}

static void Foo()
{
    Console.WriteLine("foo");
}

static void Bar()
{
    Console.WriteLine("bar");
}    

4、For()方法的取消

  在For()方法的重载方法中,可以传递一个ParallelOptions类型的参数,利用此参数可以传递一个CancellationToken参数。使用CancellationTokenSource对象用于注册CancellationToken,并允许调用Cancel方法用于取消操作。

  一旦取消操作,For()方法就抛出一个OperationCanceledException类型的异常,使用CancellationToken可以注册取消操作时的信息。调用Register方法,传递一个在取消操作时调用的委托。通过取消操作,可以将其他的迭代操作在启动之前取消,但已经启动的迭代操作允许完成。取消操作是以协作方式进行的,以避免在取消迭代操作的中间泄露资源。

static void CancelParallelLoop()
{
    var cts = new CancellationTokenSource();
    cts.Token.ThrowIfCancellationRequested();
    cts.Token.Register(() => Console.WriteLine("** token cancelled"));
    // 在500ms后取消标记
    cts.CancelAfter(500);
    try
    {
        ParallelLoopResult result = Parallel.For(0, 100,
            new ParallelOptions()
            {
                CancellationToken = cts.Token
            },
               x =>
               {
                   Console.WriteLine("loop {0} started", x);
                   int sum = 0;
                   for (int i = 0; i < 100; i++)
                   {
                       Thread.Sleep(2);
                       sum += i;
                   }
                   Console.WriteLine("loop {0} finished", x);
               });
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

 

版权声明:本文为pilgrim原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/pilgrim/p/9356972.html