你的位置:首页 > ASP.net教程

[ASP.net教程].NET 并行编程数据并行


本文内容

  • 并行编程
  • 数据并行

最近,对多线程编程,并行编程,异步编程,这三个概念有点晕了,之前我研究了异步编程《VS 2013 C# 异步编程 async await》,现在猛然发觉,自己怎么有点不明白这三者之间有什么联系和区别了呢?有点说不清、道不明的感觉~

因此,回顾了一下个人经历,屡屡思路~我刚接触计算机时,还是学校的 DOS 和 win 3.x,之后,学校换了 Windows 95,再之后,我有自己的台式机……但无论如何,那时电脑的 CPU 都是单核的,即便采用多线程,程序无论看上多么像“同时”执行,其本质上还是顺序的,因为代码段是独占 CPU 的;之后,我卖了台式机,买了笔记本电脑,CPU 是双核的,如果用多线程,那情况就不同了,能达到正真的“同时”执行——并行。

“并行”是目的,为了实现这个目的,我们采用“多线程编程”这个手段,而我们知道,多线程编程涉及的问题很多,为了简化多线程编程,加之多核 CPU 越来越普遍,于是很多编程框架本身就提供了对多线程的封装,比如一些类和方法,这些就是并行编程;而异步编程呢,异步方法旨在成为非阻止操作,异步并不会创建其他线程,因为异步方法不会在其自身线程上运行,因此它不需要多线程。

下载 Demo

并行编程


多核 CPU 已经相当普遍,使多个线程能够同时执行。为了多核,你可以对代码进行并行化,将工作分摊在多个 CPU 上。过去,并行化需要线程和锁的低级操作。Visual Studio 2010 和 .NET Framework 4 提供了新的运行时、新的类库类型以及新的诊断工具,从而增强了对并行编程的支持。这些功能简化了并行开发,通过固有方法编写高效、细化且可伸缩的并行代码,而不必直接处理线程或线程池。

下图从较高层面上概述了 .NET Framework 4 中的并行编程体系结构。

IC387462

任务并行库(The Task Parallel Library,TPL)是 System.ThreadingSystem.Threading.Tasks 空间中的一组公共类型和 API。TPL 的目的是通过简化将并行和并发添加到应用程序的过程来提高开发人员的工作效率。TPL 能动态地最有效地使用所有可用的处理器。此外,TPL 还处理工作分区、ThreadPool 上的线程调度、取消支持、状态管理以及其他低级别的细节操作。通过使用 TPL,你可以将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。

从 .NET Framework 4 开始,TPL 是编写多线程代码和并行代码的首选方法。但并不是所有代码都适合并行化,例如,如果某个循环在每次迭代时只执行少量工作,或它在很多次迭代时都不运行,那么并行化的开销可能导致代码运行更慢。 此外,像任何多线程代码一样,并行化会增加程序执行的复杂性。 尽管 TPL 简化了多线程方案,但建议对线程处理概念(例如,锁、死锁和争用条件)进行基本了解,以便能够有效地使用 TPL。

数据并行


我们可以对数据进行并行,简单地说,对集合中的每个数据同时执行相同的操作,当然也可以对任务和数据流进行并行。本文主要描述数据并行。

TPL 通过 System.Threading.Tasks.Parallel 类实现数据并行,此类提供了 for 和 foreach 基于并行的实现。为 Parallel.ForParallel.ForEach 编写循环逻辑与编写顺序循环非常类似。你不必创建线程或队列工作项。基本循环中不必采用锁。TPL 将处理所有低级别工作。

System.Threading.Tasks.Parallel类有三个方法:For、ForEach、Invoke,它们有很多重载,没必要说明这些方法本身,因此,下面用实例说明如何用这些方法进行并行编程,并对比与顺序执行的性能。

计算 PI

对比顺序计算 PI、并行计算 PI 和并行分区计算 PI 的性能。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace ComputePi
{
  class Program
  {
    const int num_steps = 100000000;
 
    static void Main(string[] args)
    {
      Time(() => SerialPi());
      Time(() => ParallelPi());
      Time(() => ParallelPartitionerPi());
 
      Console.WriteLine("Press any keys to Exit.");
      Console.ReadLine();
    }
    /// <summary>
    /// Times the execution of a function and outputs both the elapsed time and the function's result.
    /// </summary>
    static void Time<T>(Func<T> work)
    {
      var sw = Stopwatch.StartNew();
      var result = work();
      Console.WriteLine(sw.Elapsed + ": " + result);
    }
 
    /// <summary>
    /// Estimates the value of PI using a for loop.
    /// </summary>
    static double SerialPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      for (int i = 0; i < num_steps; i++)
      {
        double x = (i + 0.5) * step;
        sum = sum + 4.0 / (1.0 + x * x);
      }
      return step * sum;
    }
 
    /// <summary>
    /// Estimates the value of PI using a Parallel.For.
    /// </summary>
    static double ParallelPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      object monitor = new object();
      Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
      {
        double x = (i + 0.5) * step;
        return local + 4.0 / (1.0 + x * x);
      }, local => { lock (monitor) sum += local; });
      return step * sum;
    }
 
 
    /// <summary>
    /// Estimates the value of PI using a Parallel.ForEach and a range partitioner.
    /// </summary>
    static double ParallelPartitionerPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      object monitor = new object();
      Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0, (range, state, local) =>
      {
        for (int i = range.Item1; i < range.Item2; i++)
        {
          double x = (i + 0.5) * step;
          local += 4.0 / (1.0 + x * x);
        }
        return local;
      }, local => { lock (monitor) sum += local; });
      return step * sum;
    }
  }
}
//RESULT:
//00:00:00.4358850: 3.14159265359043
//00:00:00.4523856: 3.14159265358987
//00:00:00.1435475: 3.14159265358979
//Press any keys to Exit.


当 For 循环的循环体很小时,它的执行速度可能比等效的顺序循环更慢。这也就是为什么顺序计算 PI 与并行计算 PI 的时间差不多,因为对数据进行分区所涉及的开销以及调用每个循环迭代上的委托的开销导致了性能降低。为了解决类似情况,Partitioner 类提供 Partitioner.Create 方法,该方法使您可以为委托体提供顺序循环,以便每个分区只调用一次委托,而不是每个迭代调用一次委托。因此,并行分区计算 PI 时,性能有大幅度提升。

矩阵相乘

对比顺序与并行计算矩阵乘法的性能。

using System;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace DataParallelismDemo
{
  class Program
  {
    /// <summary>
    /// Sequential_Loop
    /// </summary>
    /// <param name="matA"></param>
    /// <param name="matB"></param>
    /// <param name="result"></param>
    static void MultiplyMatricesSequential(double[,] matA, double[,] matB, double[,] result)
    {
      int matACols = matA.GetLength(1);
      int matBCols = matB.GetLength(1);
      int matARows = matA.GetLength(0);
 
      for (int i = 0; i < matARows; i++)
      {
        for (int j = 0; j < matBCols; j++)
        {
          for (int k = 0; k < matACols; k++)
          {
            result[i, j] += matA[i, k] * matB[k, j];
          }
        }
      }
    }
 
    /// <summary>
    /// Parallel_Loop
    /// </summary>
    /// <param name="matA"></param>
    /// <param name="matB"></param>
    /// <param name="result"></param>
    static void MultiplyMatricesParallel(double[,] matA, double[,] matB, double[,] result)
    {
      int matACols = matA.GetLength(1);
      int matBCols = matB.GetLength(1);
      int matARows = matA.GetLength(0);
 
      // A basic matrix multiplication.
      // Parallelize the outer loop to partition the source array by rows.
      Parallel.For(0, matARows, i =>
      {
        for (int j = 0; j < matBCols; j++)
        {
          // Use a temporary to improve parallel performance.
          double temp = 0;
          for (int k = 0; k < matACols; k++)
          {
            temp += matA[i, k] * matB[k, j];
          }
          result[i, j] = temp;
        }
      }); // Parallel.For
    }
 
    static void Main(string[] args)
    {
      // Set up matrices. Use small values to better view 
      // result matrix. Increase the counts to see greater 
      // speedup in the parallel loop vs. the sequential loop.
      int colCount = 180;
      int rowCount = 2000;
      int colCount2 = 270;
      double[,] m1 = InitializeMatrix(rowCount, colCount);
      double[,] m2 = InitializeMatrix(colCount, colCount2);
      double[,] result = new double[rowCount, colCount2];
 
      // First do the sequential version.
      Console.WriteLine("Executing sequential loop...");
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();
 
      MultiplyMatricesSequential(m1, m2, result);
      stopwatch.Stop();
      Console.WriteLine("Sequential loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);
 
      // For the skeptics.
      OfferToPrint(rowCount, colCount2, result);
 
      // Reset timer and results matrix. 
      stopwatch.Reset();
      result = new double[rowCount, colCount2];
 
      // Do the parallel loop.
      Console.WriteLine("Executing parallel loop...");
      stopwatch.Start();
      MultiplyMatricesParallel(m1, m2, result);
      stopwatch.Stop();
      Console.WriteLine("Parallel loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);
      OfferToPrint(rowCount, colCount2, result);
 
      // Keep the console window open in debug mode.
      Console.WriteLine("Press any key to exit.");
      Console.ReadKey();
    }
 
    /// <summary>
    /// 生成矩阵
    /// </summary>
    /// <param name="rows"></param>
    /// <param name="cols"></param>
    /// <returns></returns>
    static double[,] InitializeMatrix(int rows, int cols)
    {
      double[,] matrix = new double[rows, cols];
 
      Random r = new Random();
      for (int i = 0; i < rows; i++)
      {
        for (int j = 0; j < cols; j++)
        {
          matrix[i, j] = r.Next(100);
        }
      }
      return matrix;
    }
 
    private static void OfferToPrint(int rowCount, int colCount, double[,] matrix)
    {
      Console.WriteLine("Computation complete. Print results? y/n");
      char c = Console.ReadKey().KeyChar;
      if (c == 'y' || c == 'Y')
      {
        Console.WindowWidth = 180;
        Console.WriteLine();
        for (int x = 0; x < rowCount; x++)
        {
          Console.WriteLine("ROW {0}: ", x);
          for (int y = 0; y < colCount; y++)
          {
            Console.Write("{0:#.##} ", matrix[x, y]);
          }
          Console.WriteLine();
        }
 
      }
    }
  }
}
//RESULST:
//Executing sequential loop...
//Sequential loop time in milliseconds: 1168
//Computation complete. Print results? y/n
//nExecuting parallel loop...
//Parallel loop time in milliseconds: 360
//Computation complete. Print results? y/n
//nPress any key to exit.
 


把目录中的全部图片复制到另一个目录

using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Configuration;
 
namespace MovePics
{
  class Program
  {
    protected static string PIC_PATH = ConfigurationManager.AppSettings["PicPath"].ToString();
    protected static string NEW_PIC_PATH = ConfigurationManager.AppSettings["NewPicPath"].ToString();
    static void Main(string[] args)
    {
      // A simple source for demonstration purposes. Modify this path as necessary.
      string[] files = System.IO.Directory.GetFiles(PIC_PATH, "*.png");
      System.IO.Directory.CreateDirectory(NEW_PIC_PATH);
 
      // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
      Parallel.ForEach(files, currentFile =>
      {
        // The more computational work you do here, the greater 
        // the speedup compared to a sequential foreach loop.
        string filename = System.IO.Path.GetFileName(currentFile);
        System.Drawing.Bitmap bitmap = new System.Drawing.Bitmap(currentFile);
 
        bitmap.RotateFlip(System.Drawing.RotateFlipType.Rotate180FlipNone);
        bitmap.Save(System.IO.Path.Combine(NEW_PIC_PATH, filename));
 
        // Peek behind the scenes to see how work is parallelized.
        // But be aware: Thread contention for the Console slows down parallel loops!!!
        Console.WriteLine("Processing {0} on thread {1}", filename,
                  Thread.CurrentThread.ManagedThreadId);
 
      } //close lambda expression
         ); //close method invocation
 
      // Keep the console window open in debug mode.
      Console.WriteLine("Processing complete. Press any key to exit.");
      Console.ReadKey();
    }
  }
}


列出指定目录中的所有文件,包括其子目录

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace TraverseTreeParallelForEach
{
  class Program
  {
    static void Main(string[] args)
    {
      try
      {
        TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
        {
          // Exceptions are no-ops.
          try
          {
            // Do nothing with the data except read it.
            byte[] data = File.ReadAllBytes(f);
          }
          catch (FileNotFoundException) { }
          catch (IOException) { }
          catch (UnauthorizedAccessException) { }
          catch (SecurityException) { }
          // Display the filename.
          Console.WriteLine(f);
        });
      }
      catch (ArgumentException)
      {
        Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
      }
 
      // Keep the console window open.
      Console.WriteLine("Press any key to exit.");
      Console.ReadKey();
    }
 
    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
      //Count of files traversed and timer for diagnostic output
      int fileCount = 0;
      var sw = Stopwatch.StartNew();
 
      // Determine whether to parallelize file processing on each folder based on processor count.
      int procCount = System.Environment.ProcessorCount;
 
      // Data structure to hold names of subfolders to be examined for files.
      Stack<string> dirs = new Stack<string>();
 
      if (!Directory.Exists(root))
      {
        throw new ArgumentException();
      }
      dirs.Push(root);
 
      while (dirs.Count > 0)
      {
        string currentDir = dirs.Pop();
        string[] subDirs = { };
        string[] files = { };
 
        try
        {
          subDirs = Directory.GetDirectories(currentDir);
        }
        // Thrown if we do not have discovery permission on the directory.
        catch (UnauthorizedAccessException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        // Thrown if another process has deleted the directory after we retrieved its name.
        catch (DirectoryNotFoundException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
 
        try
        {
          files = Directory.GetFiles(currentDir);
        }
        catch (UnauthorizedAccessException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        catch (DirectoryNotFoundException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        catch (IOException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
 
        // Execute in parallel if there are enough files in the directory.
        // Otherwise, execute sequentially.Files are opened and processed
        // synchronously but this could be modified to perform async I/O.
        try
        {
          if (files.Length < procCount)
          {
            foreach (var file in files)
            {
              action(file);
              fileCount++;
            }
          }
          else
          {
            Parallel.ForEach(files, () => 0, (file, loopState, localCount) =>
            {
              action(file);
              return (int)++localCount;
            },
                     (c) =>
                     {
                       Interlocked.Add(ref fileCount, c);
                     });
          }
        }
        catch (AggregateException ae)
        {
          ae.Handle((ex) =>
          {
            if (ex is UnauthorizedAccessException)
            {
              // Here we just output a message and go on.
              Console.WriteLine(ex.Message);
              return true;
            }
            // Handle other exceptions here if necessary...
 
            return false;
          });
        }
 
        // Push the subdirectories onto the stack for traversal.
        // This could also be done before handing the files.
        foreach (string str in subDirs)
          dirs.Push(str);
      }
 
      // For diagnostic purposes.
      Console.WriteLine("Processed {0} files in {1} milleseconds", fileCount, sw.ElapsedMilliseconds);
    }
  }
}


另外,Parallel.ForParallel.ForEach 方法都有若干重载,利用这些重载可以停止或中断循环执行、监视其他线程上循环的状态、维护线程本地状态、完成线程本地对象、控制并发程度,等等。 启用此功能的帮助器类型包括 ParallelLoopStateParallelOptionsParallelLoopResultCancellationTokenCancellationTokenSource

参考资料


  • Microsoft Developer Network 并行编程
  • Microsft Developer Network 数据并行

 

下载 Demo

下载 Samples forParalle Pragmming with .net framework