当前位置:必发365电子游戏 > Web前端 > 【必发娱乐365登录】Parallel.ForEach 的四线程并行管理
【必发娱乐365登录】Parallel.ForEach 的四线程并行管理
2019-12-19

必发365娱乐官网,简介

当需求为多核机器实行优化的时候,最佳先检查下你的次第是还是不是有管理能够分割开来开展并行管理。(举例,有二个有影响的人的多寡集合,此中的因素需求一个叁个扩充相互独立的耗费时间总计)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来增加援救我们开展并行管理,本文研讨这二者的异样及适用的情景。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Threading.Tasks;
using System.Collections.Concurrent;

namespace ConsoleApplication1
{
    class Program
    {
        /*
         * 测试分析结果
         * Parallel.For、Parallel.Foreach发挥出了平行运算的优势,将效率提高了接近一半左右。
         * 
         * 测试总结
         * 对于Parallel.For、Parallel.Foreach的使用应该要特别小心,
         * 它们的优势是处理列表很长,且对列表内的元素进行很复杂的业务逻辑,且不会使用共享资源,
         * 只针对自身的业务逻辑处理,方才能提升效率。
         * 因为如果逻辑过于简单的话,创建线程的花费将大于业务执行的花费,得不偿失。         
         */
        static void Main(string[] args)
        {
            //产生测试资料
            List<int> testData = new List<int>();
            Random Rand = new Random();
            //产生乱数列表
            for (int i = 0; i < 1000000; i++)
            {
                testData.Add(Rand.Next(1000));
            }
            //打印正确结果
            Console.WriteLine(testData.Sum());

            for (int i = 0; i < 5; i++)
            {
                Console.WriteLine();
                TestFor(testData);
                TestParallelFor(testData);
                TestParallelForeach(testData);
            }
            Console.ReadKey();
        }

        static void TestFor(List<int> testData)
        {
            DateTime time1 = DateTime.Now;
            foreach (var item in testData)
            {
                item.ToString();
            }
            Console.WriteLine(string.Format("ForEach:     t{0} in {1}", testData.Sum(), (DateTime.Now - time1).TotalMilliseconds));
        }

        static void TestParallelFor(List<int> testData)
        {
            DateTime time1 = DateTime.Now;
            Parallel.For(0, testData.Count, (i, loopState) =>
            {
                testData[i].ToString();
            });
            Console.WriteLine(string.Format("Parallel.For:   t{0} in {1}", testData.Sum(), (DateTime.Now - time1).TotalMilliseconds));
        }

        static void TestParallelForeach(List<int> testData)
        {
            //记录结果用
            DateTime time1 = DateTime.Now;
            ConcurrentStack<int> resultData = new ConcurrentStack<int>();
            Parallel.ForEach(testData, (item, loopState) =>
            {
                item.ToString();
            });
            Console.WriteLine(string.Format("Parallel.ForEach:t{0} in {1}", testData.Sum(), (DateTime.Now - time1).TotalMilliseconds));
        }
    }
}

Parallel.ForEach

Parallel.ForEach 是 foreach 的四线程完结,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的异样的地方在于它选拔四线程来施行循环体内的代码段。

Parallel.ForEach 最常用的花样如下:

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> body)

必发娱乐365登录 1

PLINQ

PLINQ 也是一种对数码实行并行管理的编制程序模型,它经过 LINQ 的语法来兑现形似Parallel.ForEach 的多线程并行管理。

场馆豆蔻梢头:轻松数据 之 独立操作的并行管理(使用 Parallel.ForEach)

演示代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
    Parallel.ForEach(source, element => action(element));
}

理由:

  1. 就算 PLINQ 也提供了一个相近的 ForAll 接口,但它对于简易的独立操作太重量化了。

  2. 动用 Parallel.ForEach 你还是能够够设定 ParallelOptions.MaxDegreeOfParalelism 参数(钦命最多须要有个别个线程),那样当 ThreadPool 财富紧张(以至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach 依然能够胜利运维,何况当后续有更加多可用线程现身时,Parallel.ForEach 也能立即地运用这么些线程。PLINQ 只可以通过WithDegreeOfParallelism 方法来必要一定的线程数,即:要求了多少个就是多少个,不会多也不会少。

必发娱乐365登录,情景二:顺序数据 之 并行管理(使用 PLINQ 来维持数据顺序)

当输出的数量连串须要保障原有的次第时采取 PLINQ 的 AsOrdered 方法特轻松高效。

亲自去做代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
    var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
        // Movie frames will be evaluated lazily
    }
}

理由:

  1. Parallel.ForEach 落成起来供给绕一些弯路,首先你供给选择以下的重载在点子:

    public static ParallelLoopResult ForEach(

     IEnumerable<TSource> source,
     Action<TSource, ParallelLoopState, Int64> body)
    

其大器晚成重载的 Action 多带有了 index  参数,那样你在输出的时候就能够使用这几个值来保证原本的行列顺序。请看下边包车型客车例子:

public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
    var length = Math.Min(v1.Length, v2.Lenth);
    double[] result = new double[length];
    Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
        result[elementIndex] = element * v2[elementIndex]);
    return result;
}

你只怕已经发掘到那边有个显著的主题材料:大家采纳了定点长度的数组。借使传入的是 IEnumerable 那么您有4个缓和方案:

(1卡塔尔(قطر‎ 调用 IEnumerable.Count(卡塔尔(قطر‎来获取数据长度,然后用那些值实例化二个恒定长度的数组,然后选用上例的代码。

(2卡塔尔(قطر‎ The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴最早的小说)

(3卡塔尔(قطر‎第两种办法是接纳再次来到叁个哈希集合的措施,这种方法下平日供给起码2倍于传播数据的内部存款和储蓄器,所以拍卖大数量时请慎用。

(4卡塔尔国 自个儿达成排序算法(有限支撑传入数据与传播数据经过排序后次序大器晚成致)

  1. 对照 PLINQ 的 AsOrdered 方法这么归纳,并且该措施能处理流式的数码,进而允许传入数据是延迟达成的(lazy materialized)

此情此景三:流数据 之 并行管理(使用 PLINQ)

PLINQ 能输出流数据,那些特点在瞬间场面特别有效:

1. 结出集不需假设多少个风流倜傥体化的处理实现的数组,即:任曾几何时间点下内部存款和储蓄器中仅维持数组中的部分信息

  1. 你能够在二个单线程上遍历输出结果(就像是他们曾经存在/管理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
    var StockRiskPortfolio =
        Stocks
        .AsParallel()
        .AsOrdered()
        .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
        .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
        SomeStockComputation(stockRisk.Risk);
        // StockRiskPortfolio will be a stream of results
    }
}

此处运用三个单线程的 foreach 来对 PLINQ 的输出进行一而再一而再再而三管理,日常状态下 foreach 没有需求翘首以待 PLINQ 管理完全部数据就能够开首运作。

PLINQ 也同意钦命输出缓存的章程,具体可参看 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举

气象四:处理八个聚众(使用 PLINQ)

PLINQ 的 Zip 方法提供了并且遍历多少个集归总开展整合元算的主意,並且它能够与其他查询管理操作结合,完成特别复杂的效果与利益。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    return
        a
        .AsParallel()
        .AsOrdered()
        .Select(element => ExpensiveComputation(element))
        .Zip(
            b
            .AsParallel()
            .AsOrdered()
            .Select(element => DifferentExpensiveComputation(element)),
            (a_element, b_element) => Combine(a_element,b_element));
}

【必发娱乐365登录】Parallel.ForEach 的四线程并行管理。示范中的五个数据源能够并行管理,当互相都有一个可用成分时提供给 Zip 进行后续管理(Combine卡塔尔。

Parallel.ForEach 也能兑现近似的 Zip 管理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    var numElements = Math.Min(a.Count(), b.Count());
    var result = new T[numElements];
    Parallel.ForEach(a,
        (element, loopstate, index) =>
        {
            var a_element = ExpensiveComputation(element);
            var b_element = DifferentExpensiveComputation(b.ElementAt(index));
            result[index] = Combine(a_element, b_element);
        });
    return result;
}

当然使用 Parallel.ForEach 后您就得投机认但是还是不是要保全原本连串,并且要小心数组越界访谈的难题。

场景五:线程局地变量

Parallel.ForEach 提供了三个线程局地变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

选取的演示:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
    var results = new List<R>();
    using (SemaphoreSlim sem = new SemaphoreSlim(1))
    {
        Parallel.ForEach(source,
            () => new List<R>(),
            (element, loopstate, localStorage) =>
            {
                bool filter = filterFunction(element);
                if (filter)
                    localStorage.Add(element);
                return localStorage;
            },
            (finalStorage) =>
            {
                lock(myLock)
                {
                    results.AddRange(finalStorage)
                };
            });
    }
    return results;
}

线程局地变量有何样优势呢?请看上面包车型地铁例子(四个网页抓取程序):

public static void UnsafeDownloadUrls ()
{
    WebClient webclient = new WebClient();
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

日常性第风华正茂版代码是这样写的,可是运转时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。那是因为多个线程不恐怕同有时间做客同一个 WebClient 对象。所以我们会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            WebClient webclient = new WebClient();
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

校订今后依然有标题,因为您的机械不是服务器,一大波实例化的 WebClient 赶快达到你机器允许的伪造连接上限数。线程局地变量能够解决这么些主题材料:

public static void downloadUrlsSafe()
{
    Parallel.ForEach(urls,
        () => new WebClient(),
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, filenames[index]+".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            return webclient;
        },
            (webclient) => { });
}

这么的写法保险了作者们能赢得充足的 WebClient 实例,同失常候这几个 WebClient 实例彼此隔断仅仅归于个别关联的线程。

虽说 PLINQ 提供了 ThreadLocal<T> 对象来兑现肖似的成效:

public static void downloadUrl()
{
    var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    var res =
        urls
        .AsParallel()
        .ForAll(
            url =>
            {
                webclient.Value.DownloadFile(url, host[url] +".dat"));
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            });
}

只是请留心:ThreadLocal<T> 相对来讲成本越来越大!

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载申明如下,在这之中含有多少个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop(卡塔尔(英语:State of Qatar)提供了脱离循环的不二秘籍,这种措施要比其他三种艺术更加快。那么些办法文告循环不要再起步实践新的迭代,并尽也许快的生产循环。

ParallelLoopState.IsStopped 属性可用来推断其余迭代是还是不是调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var matchFound = false;
    Parallel.ForEach(TSpace,
        (curValue, loopstate) =>
            {
                if (curValue.Equals(match) )
                {
                    matchFound = true;
                    loopstate.Stop();
                }
            });
    return matchFound;
}

ParallelLoopState.Break(卡塔尔(英语:State of Qatar) 文告循环继续实施本成分前的迭代,但不实践本成分之后的迭代。最前调用 Break 的起功用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种管理方式平常被利用在三个如法炮制的物色管理中,举例您有三个排序过的数组,你想在里头查找匹配成分的矮小 index,那么能够接收以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var loopResult = Parallel.ForEach(source,
        (curValue, loopState, curIndex) =>
        {
            if (curValue.Equals(match))
            {
                loopState.Break();
            }
         });
    var matchedIndex = loopResult.LowestBreakIteration;
    return matchedIndex.HasValue ? matchedIndex : -1;
}