当我们使用.net 4.0中的任务并行库的时候,有时候我们是需要自己控制并发粒度(调度线程数)的,这个时候往往就需要我们自己写TaskScheduler了,一个简单的实现如下:

View Code

public sealed class SimpleTaskScheduler : TaskScheduler, IDisposable
{
    BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    List<Thread> _threads = new List<Thread>();

    public SimpleTaskScheduler(int initNumberOfThreads = 3)
    {
        if (initNumberOfThreads < 1)
            throw new ArgumentOutOfRangeException();

        _threads.AddRange(Enumerable.Range(0, initNumberOfThreads).Select(_ => CreateThread()));
    }

    Thread CreateThread()
    {
        var thread = new Thread(() =>
        {
            foreach (var t in _tasks.GetConsumingEnumerable())
            {
                TryExecuteTask(t);
            }
        });

        thread.IsBackground = true;
        thread.Start();
        return thread;
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        //这个函数好像没有调过,返回null也不影响功能
        return _tasks.ToArray();
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }

    public override int MaximumConcurrencyLevel { get { return _threads.Count; } }

    #region IDisposable 成员

    public void Dispose()
    {
        if (_tasks == null)        //防止重入
            return;

        _tasks.CompleteAdding();
        _threads.ForEach(t => t.Join());

        _tasks.Dispose();
        _tasks = null;
    }

    #endregion
}

 

这个类实现并不复杂,但其实用得还是比较多的,这里记录一下,以备后续查询。

PS:当前在Parallel.ForEachParallel.For等数据并发函数中可以通过ParallelOptions.MaxDegreeOfParallelism来控制并发粒度,但无法控制调度顺序。也可以通过类似这样的TaskScheduler来改变调度顺序。

 

版权声明:本文为TianFang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/TianFang/archive/2011/10/05/2199526.html