在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。 比如说有一个盒子的集合,分别让盒子旋转一定的角度。 void RotateBox(IEnumerable<Box> boxes, float degree){ Parall ...
在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。
比如说有一个盒子的集合,分别让盒子旋转一定的角度。
void RotateBox(IEnumerable<Box> boxes, float degree)
{
Parallel.ForEach(boxes, box => box.Rotate(degree));
}
如果并行任务中的一个任务出现异常,需要结束出问题的任务呢?
Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续。
void RotateBoxes(IEnumerable<Box> boxes)
{
Parallel.ForEach(boxes, (box, state) => {
if(!box.IsInvertible)
{
state.Stop();
}
else
{
box.Invert();
}
});
}
如果想取消整个并行任务呢?
Parallel.ForEach也为我们提供一个重载方法,可以接收一个ParallelOption的形参,通过设置ParallelOption的CancellationToken属性来取消整个并行过程。
void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token)
{
Paralle.ForEach(boxes,
new ParallelOptions{CancellationToken=token},
box => box.Rotate(degrees));
}
在使用的时候,一般先定义个全局CancellationTokenSource类型的变量。
static CancellationTokenSource token = new CancellationTokenSource();
然后,在某个并行任务中设置取消。
token.Cancel();
最后,再把这个token赋值给以上方法的CancellationToken属性。
各个并行任务如何共享状态,共享一个变量呢?
int InvertBoxes(IEnumerable<Box> boxes)
{
object mutex = new object();//用来锁
int nonInvertibleCount = 0; //各任务共享的变量
Paralle.ForEach(boxes, box =>{
if(box.IsInvertible){
box.Invert();
}
else
{
lock(mutex)
{
++nonInvertibleCount;
}
}
});
return nonInvertibleCount;
}
可见,对于各并行get='_blank'>线程共享的变量,需要加一个线程锁,以防止多个线程同时操作共享变量。
另外,Parallel.ForEach提供了一个重载,其中localFinally形参接收一个委托类型,通过该委托让并行任务共享一个变量。比如:
static int ParallelSum(IEnumerable<int> values)
{
object mutex = new object();
int result = 0;
Parallel.ForEach(
source: values,
LocalInit: () => 0,
body: (item, state, localVlaue) => localValue + item,
localFinally: localValue => {
lock(mutex)
{
result += localValue;
}
}
);
return result;
}
当然,也别忘了PLINQ也支持并行:
static int ParalleSum(IEnumerable<int> values)
{
return values.AsParallel().Sum();
}
PLINQ的Aggregate方法也可实现:
static int ParallelSum(IEnumerable<int> values)
{
return values.AsParallel().Aggregate(
seed: 0,
func: (sum, item) => sum + item;
);
}
以上,是对相互独立数据的处理。
那么,如何处理相互独立的多个任务呢?
通过Parallel.Invoke方法可以实现。
static voiD ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length/2),
() => ProcessPartialArray(array, array.Length/2, array.Length)
);
}
static void ProcessPartialArray(double[] array, int begin, int end)
{}
使用Parallel.Invoke方法还可以让一个Action或这方法执行多次。
static void Do20(Action action)
{
//让某个Action执行20次
Action[] actions = Enumerable.Repeat(action, 20).ToArray();
Parallel.Invoke(actions);
}
Parallel.Invoke方法也提供了重载,接收ParallelOption类型的形参,用来控制取消整个并行过程。
static void Do20(Action action)
{
//让某个Action执行20次
Action[] actions = Enumerable.Repeat(action, 20).ToArray();
Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);
}
参考资料:C#并发编程经典实例
原标题:并行编程中的取消任务、共享状态,等等
关键词:
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们:
admin#shaoqun.com
(#换成@)。