你的位置:首页 > ASP.net教程

[ASP.net教程]并行编程中的取消任务、共享状态,等等


 

在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。

 

比如说有一个盒子的集合,分别让盒子旋转一定的角度。

 

void RotateBox(IEnumerable<Box> boxes, float degree)
{
  Parallel.ForEach(boxes, box => box.Rotate(degree));
}

 

如果并行任务中的一个任务出现异常,需要结束出问题的任务呢?

 

Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续。

 

void RotateBoxes(IEnumerable<Box> boxes)
{
  Parallel.ForEach(boxes, (box, state) => {
    if(!box.IsInvertible)
    {
      state.Stop();
    }
    else
    {
      box.Invert();
    }
  });
}

 

如果想取消整个并行任务呢?

 

Parallel.ForEach也为我们提供一个重载方法,可以接收一个ParallelOption的形参,通过设置ParallelOption的CancellationToken属性来取消整个并行过程。

 

void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token)
{
  Paralle.ForEach(boxes, 
  new ParallelOptions{CancellationToken=token},
  box => box.Rotate(degrees));
}

 

在使用的时候,一般先定义个全局CancellationTokenSource类型的变量。

 

static CancellationTokenSource token = new CancellationTokenSource();

 

然后,在某个并行任务中设置取消。

 

token.Cancel();

 

最后,再把这个token赋值给以上方法的CancellationToken属性。

 

各个并行任务如何共享状态,共享一个变量呢?

 

int InvertBoxes(IEnumerable<Box> boxes)
{
  object mutex = new object();//用来锁
  int nonInvertibleCount = 0; //各任务共享的变量
  Paralle.ForEach(boxes, box =>{
    if(box.IsInvertible){
      box.Invert();
    }
    else
    {
      lock(mutex)
      {
        ++nonInvertibleCount;
      }
    }
  });
  return nonInvertibleCount;
}

 

可见,对于各并行线程共享的变量,需要加一个线程锁,以防止多个线程同时操作共享变量。

 

另外,Parallel.ForEach提供了一个重载,其中localFinally形参接收一个委托类型,通过该委托让并行任务共享一个变量。比如:

 

static int ParallelSum(IEnumerable<int> values)
{
  object mutex = new object();
  int result = 0;
  Parallel.ForEach(
    source: values,
    LocalInit: () => 0,
    body: (item, state, localVlaue) => localValue + item,
    localFinally: localValue => {
      lock(mutex)
      {
        result += localValue;
      }
    }
  );
  return result;
}

 

当然,也别忘了PLINQ也支持并行:

 

static int ParalleSum(IEnumerable<int> values)
{
  return values.AsParallel().Sum();
}

 

PLINQ的Aggregate方法也可实现:

 

static int ParallelSum(IEnumerable<int> values)
{
  return values.AsParallel().Aggregate(
    seed: 0,
    func: (sum, item) => sum + item;
  );
}

 

以上,是对相互独立数据的处理。

 

那么,如何处理相互独立的多个任务呢?

 

通过Parallel.Invoke方法可以实现。

 

static voiD ProcessArray(double[] array)
{
  Parallel.Invoke(
    () => ProcessPartialArray(array, 0, array.Length/2),
    () => ProcessPartialArray(array, array.Length/2, array.Length)
  );
}
static void ProcessPartialArray(double[] array, int begin, int end)
{}

 

使用Parallel.Invoke方法还可以让一个Action或这方法执行多次。

 

static void Do20(Action action)
{
  //让某个Action执行20次
  Action[] actions = Enumerable.Repeat(action, 20).ToArray();
  Parallel.Invoke(actions);
}

 

Parallel.Invoke方法也提供了重载,接收ParallelOption类型的形参,用来控制取消整个并行过程。

 

static void Do20(Action action)
{
  //让某个Action执行20次
  Action[] actions = Enumerable.Repeat(action, 20).ToArray();
  Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);
}

 

参考资料:C#并发编程经典实例