你的位置:首页 > 软件开发 > ASP.net > .NET 并行编程——数据并行

.NET 并行编程——数据并行

发布时间:2015-04-21 18:00:49
本文内容 并行编程 数据并行 最近,对多线程编程,并行编程,异步编程,这三个概念有点晕了,怎么突然觉得,自己有点不明白这三者之间有什么联系和区别了呢? 因此,回顾了一下个人经历,屡屡思路~我刚接触计算机时,还是学校的 DOS 和 win 3.x,之后,学校换了 Wi ...

本文内容

  • 并行编程
  • 数据并行

最近,对多get='_blank'>线程编程,并行编程,异步编程,这三个概念有点晕了,怎么突然觉得,自己有点不明白这三者之间有什么联系和区别了呢?

因此,回顾了一下个人经历,屡屡思路~我刚接触计算机时,还是学校的 DOS 和 win 3.x,之后,学校换了 Windows 95,再之后,我有自己的台式机……但是无论如何,那时的电脑 CPU 都是单核的,即便采用多线程,无论看上多么像“同时”执行的,本质上还是顺序的,因为代码段是独占 CPU 的;之后,我把台式机卖了,买了个笔记本电脑,CPU 是双核的,如果用多线程,那情况就不同了,能达到正真的“同时”执行——并行。

“并行”是目的,为了实现这个目的,我们采用“多线程编程”这个手段,而我们知道,多线程编程涉及的问题比较多,为了简化多线程编程,加之多核 CPU 越来越普遍,于是框架本身就提供了对多线程的封装,比如一些类和方法——并行编程;而异步编程,异步并不会创建新的线程,它只是运行在不同的线程,这样就不会阻塞当前的线程。

下载 Demo

并行编程


许多个人计算机和工作站都有两个或四个内核(即 CPU),使多个线程能够同时执行。 在不久的将来,计算机预期会有更多的内核。 为了利用当今和未来的硬件,您可以对代码进行并行化,以将工作分摊在多个处理器上。 过去,并行化需要线程和锁的低级操作。

Visual Studio 2010 和 .NET Framework 4 提供了新的运行时、新的类库类型以及新的诊断工具,从而增强了对并行编程的支持。 这些功能简化了并行开发,使您能够通过固有方法编写高效、细化且可伸缩的并行代码,而不必直接处理线程或线程池。

下图从较高层面上概述了 .NET Framework 4 中的并行编程体系结构。

.NET 并行编程——数据并行

任务并行库(The Task Parallel Library,TPL)是 System.Threading 和 System.Threading.Tasks 空间中的一组公共类型和 API。 TPL 的目的是通过简化将并行和并发添加到应用程序的过程来提高开发人员的工作效率。 TPL 动态缩放并发的程度以最有效地使用所有可用的处理器。 此外,TPL 还处理工作分区、ThreadPool 上的线程调度、取消支持、状态管理以及其他低级别的细节操作。 通过使用 TPL,你可以在将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。

从 .NET Framework 4 开始,TPL 是编写多线程代码和并行代码的首选方法。 但是,并不是所有代码都适合并行化;例如,如果某个循环在每次迭代时只执行少量工作,或它在很多次迭代时都不运行,那么并行化的开销可能导致代码运行更慢。 此外,像任何多线程代码一样,并行化会增加程序执行的复杂性。 尽管 TPL 简化了多线程方案,但我们建议你对线程处理概念(例如,锁、死锁和争用条件)进行基本的了解,以便能够有效地使用 TPL。

数据并行


我们可以对数据进行并行,简单地说,对集合中的每个数据同时(并行)执行相同的操作,当然也可以对任务和数据流进行并行。

本文主要描述数据并行。在数据并行操作中,将对源集合进行分区,以便多个线程能够同时对不同的片段进行操作。任务并行库 (TPL) 支持通过 System.Threading.Tasks.Parallel 类实现的数据并行。 此类提供 for 和 foreach 循环基于方法的并行实现。为 Parallel.For 或 Parallel.ForEach 循环编写循环逻辑与编写顺序循环非常类似。您不必创建线程或队列工作项。在基本的循环中,您不必采用锁。TPL 将为您处理所有低级别工作。

计算 PI

比较顺序计算 PI、并行计算 PI 和并行分区计算 PI 的性能。

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace ComputePi
{
  class Program
  {
    const int num_steps = 100000000;
 
    static void Main(string[] args)
    {
      Time(() => SerialPi());
      Time(() => ParallelPi());
      Time(() => ParallelPartitionerPi());
 
      Console.WriteLine("Press any keys to Exit.");
      Console.ReadLine();
    }
    /// <summary>
    /// Times the execution of a function and outputs both the elapsed time and the function's result.
    /// </summary>
    static void Time<T>(Func<T> work)
    {
      var sw = Stopwatch.StartNew();
      var result = work();
      Console.WriteLine(sw.Elapsed + ": " + result);
    }
 
    /// <summary>
    /// Estimates the value of PI using a for loop.
    /// </summary>
    static double SerialPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      for (int i = 0; i < num_steps; i++)
      {
        double x = (i + 0.5) * step;
        sum = sum + 4.0 / (1.0 + x * x);
      }
      return step * sum;
    }
 
    /// <summary>
    /// Estimates the value of PI using a Parallel.For.
    /// </summary>
    static double ParallelPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      object monitor = new object();
      Parallel.For(0, num_steps, () => 0.0, (i, state, local) =>
      {
        double x = (i + 0.5) * step;
        return local + 4.0 / (1.0 + x * x);
      }, local => { lock (monitor) sum += local; });
      return step * sum;
    }
 
 
    /// <summary>
    /// Estimates the value of PI using a Parallel.ForEach and a range partitioner.
    /// </summary>
    static double ParallelPartitionerPi()
    {
      double sum = 0.0;
      double step = 1.0 / (double)num_steps;
      object monitor = new object();
      Parallel.ForEach(Partitioner.Create(0, num_steps), () => 0.0, (range, state, local) =>
      {
        for (int i = range.Item1; i < range.Item2; i++)
        {
          double x = (i + 0.5) * step;
          local += 4.0 / (1.0 + x * x);
        }
        return local;
      }, local => { lock (monitor) sum += local; });
      return step * sum;
    }
  }
}
//RESULT:
//00:00:00.4358850: 3.14159265359043
//00:00:00.4523856: 3.14159265358987
//00:00:00.1435475: 3.14159265358979
//Press any keys to Exit.

矩阵相乘

比较顺序和并行进行矩阵乘法的性能。

using System;
using System.Diagnostics;
using System.Threading.Tasks;
 
namespace DataParallelismDemo
{
  class Program
  {
    /// <summary>
    /// Sequential_Loop
    /// </summary>
    /// <param name="matA"></param>
    /// <param name="matB"></param>
    /// <param name="result"></param>
    static void MultiplyMatricesSequential(double[,] matA, double[,] matB, double[,] result)
    {
      int matACols = matA.GetLength(1);
      int matBCols = matB.GetLength(1);
      int matARows = matA.GetLength(0);
 
      for (int i = 0; i < matARows; i++)
      {
        for (int j = 0; j < matBCols; j++)
        {
          for (int k = 0; k < matACols; k++)
          {
            result[i, j] += matA[i, k] * matB[k, j];
          }
        }
      }
    }
 
    /// <summary>
    /// Parallel_Loop
    /// </summary>
    /// <param name="matA"></param>
    /// <param name="matB"></param>
    /// <param name="result"></param>
    static void MultiplyMatricesParallel(double[,] matA, double[,] matB, double[,] result)
    {
      int matACols = matA.GetLength(1);
      int matBCols = matB.GetLength(1);
      int matARows = matA.GetLength(0);
 
      // A basic matrix multiplication.
      // Parallelize the outer loop to partition the source array by rows.
      Parallel.For(0, matARows, i =>
      {
        for (int j = 0; j < matBCols; j++)
        {
          // Use a temporary to improve parallel performance.
          double temp = 0;
          for (int k = 0; k < matACols; k++)
          {
            temp += matA[i, k] * matB[k, j];
          }
          result[i, j] = temp;
        }
      }); // Parallel.For
    }
 
    static void Main(string[] args)
    {
      // Set up matrices. Use small values to better view 
      // result matrix. Increase the counts to see greater 
      // speedup in the parallel loop vs. the sequential loop.
      int colCount = 180;
      int rowCount = 2000;
      int colCount2 = 270;
      double[,] m1 = InitializeMatrix(rowCount, colCount);
      double[,] m2 = InitializeMatrix(colCount, colCount2);
      double[,] result = new double[rowCount, colCount2];
 
      // First do the sequential version.
      Console.WriteLine("Executing sequential loop...");
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();
 
      MultiplyMatricesSequential(m1, m2, result);
      stopwatch.Stop();
      Console.WriteLine("Sequential loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);
 
      // For the skeptics.
      OfferToPrint(rowCount, colCount2, result);
 
      // Reset timer and results matrix. 
      stopwatch.Reset();
      result = new double[rowCount, colCount2];
 
      // Do the parallel loop.
      Console.WriteLine("Executing parallel loop...");
      stopwatch.Start();
      MultiplyMatricesParallel(m1, m2, result);
      stopwatch.Stop();
      Console.WriteLine("Parallel loop time in milliseconds: {0}", stopwatch.ElapsedMilliseconds);
      OfferToPrint(rowCount, colCount2, result);
 
      // Keep the console window open in debug mode.
      Console.WriteLine("Press any key to exit.");
      Console.ReadKey();
    }
 
    /// <summary>
    /// 生成矩阵
    /// </summary>
    /// <param name="rows"></param>
    /// <param name="cols"></param>
    /// <returns></returns>
    static double[,] InitializeMatrix(int rows, int cols)
    {
      double[,] matrix = new double[rows, cols];
 
      Random r = new Random();
      for (int i = 0; i < rows; i++)
      {
        for (int j = 0; j < cols; j++)
        {
          matrix[i, j] = r.Next(100);
        }
      }
      return matrix;
    }
 
    private static void OfferToPrint(int rowCount, int colCount, double[,] matrix)
    {
      Console.WriteLine("Computation complete. Print results? y/n");
      char c = Console.ReadKey().KeyChar;
      if (c == 'y' || c == 'Y')
      {
        Console.WindowWidth = 180;
        Console.WriteLine();
        for (int x = 0; x < rowCount; x++)
        {
          Console.WriteLine("ROW {0}: ", x);
          for (int y = 0; y < colCount; y++)
          {
            Console.Write("{0:#.##} ", matrix[x, y]);
          }
          Console.WriteLine();
        }
 
      }
    }
  }
}
//RESULST:
//Executing sequential loop...
//Sequential loop time in milliseconds: 1168
//Computation complete. Print results? y/n
//nExecuting parallel loop...
//Parallel loop time in milliseconds: 360
//Computation complete. Print results? y/n
//nPress any key to exit.
 

把目录中的全部图片复制到另一个目录

using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Configuration;
 
namespace MovePics
{
  class Program
  {
    protected static string PIC_PATH = ConfigurationManager.AppSettings["PicPath"].ToString();
    protected static string NEW_PIC_PATH = ConfigurationManager.AppSettings["NewPicPath"].ToString();
    static void Main(string[] args)
    {
      // A simple source for demonstration purposes. Modify this path as necessary.
      string[] files = System.IO.Directory.GetFiles(PIC_PATH, "*.png");
      System.IO.Directory.CreateDirectory(NEW_PIC_PATH);
 
      // Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
      Parallel.ForEach(files, currentFile =>
      {
        // The more computational work you do here, the greater 
        // the speedup compared to a sequential foreach loop.
        string filename = System.IO.Path.GetFileName(currentFile);
        System.Drawing.Bitmap bitmap = new System.Drawing.Bitmap(currentFile);
 
        bitmap.RotateFlip(System.Drawing.RotateFlipType.Rotate180FlipNone);
        bitmap.Save(System.IO.Path.Combine(NEW_PIC_PATH, filename));
 
        // Peek behind the scenes to see how work is parallelized.
        // But be aware: Thread contention for the Console slows down parallel loops!!!
        Console.WriteLine("Processing {0} on thread {1}", filename,
                  Thread.CurrentThread.ManagedThreadId);
 
      } //close lambda expression
         ); //close method invocation
 
      // Keep the console window open in debug mode.
      Console.WriteLine("Processing complete. Press any key to exit.");
      Console.ReadKey();
    }
  }
}

列出指定目录中的所有文件,包括其子目录

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace TraverseTreeParallelForEach
{
  class Program
  {
    static void Main(string[] args)
    {
      try
      {
        TraverseTreeParallelForEach(@"C:\Program Files", (f) =>
        {
          // Exceptions are no-ops.
          try
          {
            // Do nothing with the data except read it.
            byte[] data = File.ReadAllBytes(f);
          }
          catch (FileNotFoundException) { }
          catch (IOException) { }
          catch (UnauthorizedAccessException) { }
          catch (SecurityException) { }
          // Display the filename.
          Console.WriteLine(f);
        });
      }
      catch (ArgumentException)
      {
        Console.WriteLine(@"The directory 'C:\Program Files' does not exist.");
      }
 
      // Keep the console window open.
      Console.WriteLine("Press any key to exit.");
      Console.ReadKey();
    }
 
    public static void TraverseTreeParallelForEach(string root, Action<string> action)
    {
      //Count of files traversed and timer for diagnostic output
      int fileCount = 0;
      var sw = Stopwatch.StartNew();
 
      // Determine whether to parallelize file processing on each folder based on processor count.
      int procCount = System.Environment.ProcessorCount;
 
      // Data structure to hold names of subfolders to be examined for files.
      Stack<string> dirs = new Stack<string>();
 
      if (!Directory.Exists(root))
      {
        throw new ArgumentException();
      }
      dirs.Push(root);
 
      while (dirs.Count > 0)
      {
        string currentDir = dirs.Pop();
        string[] subDirs = { };
        string[] files = { };
 
        try
        {
          subDirs = Directory.GetDirectories(currentDir);
        }
        // Thrown if we do not have discovery permission on the directory.
        catch (UnauthorizedAccessException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        // Thrown if another process has deleted the directory after we retrieved its name.
        catch (DirectoryNotFoundException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
 
        try
        {
          files = Directory.GetFiles(currentDir);
        }
        catch (UnauthorizedAccessException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        catch (DirectoryNotFoundException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
        catch (IOException e)
        {
          Console.WriteLine(e.Message);
          continue;
        }
 
        // Execute in parallel if there are enough files in the directory.
        // Otherwise, execute sequentially.Files are opened and processed
        // synchronously but this could be modified to perform async I/O.
        try
        {
          if (files.Length < procCount)
          {
            foreach (var file in files)
            {
              action(file);
              fileCount++;
            }
          }
          else
          {
            Parallel.ForEach(files, () => 0, (file, loopState, localCount) =>
            {
              action(file);
              return (int)++localCount;
            },
                     (c) =>
                     {
                       Interlocked.Add(ref fileCount, c);
                     });
          }
        }
        catch (AggregateException ae)
        {
          ae.Handle((ex) =>
          {
            if (ex is UnauthorizedAccessException)
            {
              // Here we just output a message and go on.
              Console.WriteLine(ex.Message);
              return true;
            }
            // Handle other exceptions here if necessary...
 
            return false;
          });
        }
 
        // Push the subdirectories onto the stack for traversal.
        // This could also be done before handing the files.
        foreach (string str in subDirs)
          dirs.Push(str);
      }
 
      // For diagnostic purposes.
      Console.WriteLine("Processed {0} files in {1} milleseconds", fileCount, sw.ElapsedMilliseconds);
    }
  }
}

Parallel.For 和 Parallel.ForEach 方法都有若干重载,利用这些重载可以停止或中断循环执行、监视其他线程上循环的状态、维护线程本地状态、完成线程本地对象、控制并发程度,等等。 启用此功能的帮助器类型包括 ParallelLoopState、ParallelOptions、ParallelLoopResult、 CancellationToken 和 CancellationTokenSource。

参考资料


  • Microsoft Developer Network 并行编程
  • Microsft Developer Network 数据并行

 

下载 Demo

下载 Samples forParalle Pragmming with .net framework


原标题:.NET 并行编程——数据并行

关键词:.NET

*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们: admin#shaoqun.com (#换成@)。