Расшчапленне працы паміж патокамі

У цяперашні час я апрацоўваю кадры, счытвае з відэа па адным, а затым запісаць іх у файл. Гэта здаецца неэфектыўным і павольным, таму я хацеў бы падзяліць працу паміж некалькімі струменямі.

Мой бягучы код можна падсумаваць наступным чынам:

for(long n = 0; n < totalframes; n++) {
    using(Bitmap frame = vreader.ReadVideoFrame()) {
        Process(frame); //awfully slow
        WriteToFile(frame);
    }
}

Як я магу загрузіць, скажам, чатыры кадра, апрацоўваць іх у чатырох патокаў, чакаць іх усё, каб скончыць, а затым запісаць іх у файл? Вельмі важна, што кадры запісваюцца ў тым жа парадку, як яны былі ў відэа.

3
@FabianBigler: Я падазраю, што гэта апрацоўка, якая павольна, не пісаць.
дададзена аўтар Jon Skeet, крыніца
Клас Bitmap відавочнай выявай прадухіляе больш аднаго патоку ад доступу да яго піксельныя дадзеныя. Толькі адзін паток можа выклікаць LockBits (). Так што самае першае, што вам трэба зрабіць, гэта стварыць дадатковыя растравыя малюнкі, якія з'яўляюцца копіяй зыходнага відэакадры. Глыбокае капіраванне, Clone() не дастаткова добра. Пасля таго, як апрацоўка будзе зроблена, вам трэба склеіць кавалачкі разам. Відавочна, што ёсць значныя выдаткі ў гэтым замяшаныя, але і, хутчэй, ці залежыць лёгка дазваляе алгарытм апрацоўкі такога роду падраздзяленне. Апярэджваючы з ніткамі <�я> не </я> а Slamdunk.
дададзена аўтар Hans Passant, крыніца
Не ўпэўнены, што Parallel.ForEach() - уваход патоку буфера, а на выхадзе з кожнага патоку варта пісаць адразу, калі не раней дадзеных не з'яўляецца выдатным.
дададзена аўтар Martin James, крыніца
Паралельна запіс файлаў можа апынуцца больш павольна, чым захоўванне іх паслядоўна, у залежнасці ад вашага прылады захоўвання дадзеных.
дададзена аўтар Fabian Bigler, крыніца
Быў адказ аб трубаправодзе з прыкладам кода тут некалькі хвілін таму, і цяпер яго няма. WTF?
дададзена аўтар Peter W., крыніца
Я, верагодна, варта ўдакладніць: Я апрацоўкі кадраў перад запісам іх у фармаце GIF. Вось чаму парадак так важны.
дададзена аўтар Peter W., крыніца
Ёсць 2 рэчы, якія вы можаце зрабіць: 1) Зрабіце так, каб функцыя WriteToFile мае параметр прымае нумар кадра 2) Выкарыстоўвайце Parallel для працэсу кадра і, калі ўсе апрацоўваюцца запісаць іх у файл
дададзена аўтар Svexo, крыніца

7 адказы

You can process the frames with for example a Parallel.ForEach(). And then read them with an iterator block (IEnumerable<>).

But the writing needs a little more attention. Make sure you attach a number to each frame and at the end of processing, dump them in a BlockingCollection . Start a separate thread (Task) to process the queue and write the frames in order. This is a classic n-Producer/1-Consumer solution.

5
дададзена
Навошта турбавацца аб замове? Хіба вы не можаце проста выкарыстоўваць AsOrdered , каб пераканацца, што яны ў канчатковым выніку напісана ў тым жа парадку, як яны былі прачытаны? Можа быць, ReadFrames() AsParallel() AsOrdered() Выбраць (ProcessFrame) .S & ZWNJ; ... выбранніка (WriteFrame) ці нешта падобнае?
дададзена аўтар Gabe, крыніца
Так, нешта падобнае - не ўпэўнены, калі ён мае патрэбу ў іншым струмені. Пул патокаў, які ўваходзіць у «кэш reserialization» і прыходзіць да высновы, што ён можа напісаць свае ўласныя дадзеныя, так як усе папярэднія кадры былі напісаны, і, магчыма, кэшуюцца кадры з «ранніх» нітак, можа зрабіць запіс. «ReSerializer», несумненна, павінны былі б быць струменева-калекцыя.
дададзена аўтар Martin James, крыніца
Магчыма, але гэта будзе абцяжарваць рабочыя патокі з I/O на абодвух канцах. Мая прапанова ўжо ўключае ў сябе чытанне (які таксама можа быць зроблена з дапамогай асобнай адной задачы). Прымацаванне няроўнай Запісы у канцы можа парушыць ForEach планавальнік і ці partioner.
дададзена аўтар Henk Holterman, крыніца
Вы можаце напісаць итератор вакол метаду GetNext (). Я не бачу праблемы. А Parallel.For() будзе цяжэй паўторна паслядоўнасць.
дададзена аўтар Henk Holterman, крыніца
Вось дзе спажывец прыходзіць. Ён трымае наступны кадр-NR-на-запісы і яму трэба толькі затрымаць кадры, якія прыбываюць занадта рана, што павінна быць <= N-оф-Нітачны
дададзена аўтар Henk Holterman, крыніца
@Gabe - гэта каштуе размяшчэнне як асобны адказ.
дададзена аўтар Henk Holterman, крыніца
Я не магу выкарыстоўваць ForEach тут, таму што крыніца не з'яўляецца IEnumerable, але адзін Bitmap. AForge.NET-х VideoFileReader толькі мае метад .ReadVideoFrame (), які вяртае наступны кадр.
дададзена аўтар Peter W., крыніца
Ах, прабачце, мозг пердеть. Тым не менш, калі я захаваць кадры і нумары кадраў да апрацоўкі не скончана? Я не магу проста паставіць іх у спісе, не можа быць занадта шмат кадраў для гэтага. І я палічыў за лепшае б не слепа давяраць карыстачу <�я> не </я> спрабуе стварыць GIF з 3 хвіліннага відэа. : D
дададзена аўтар Peter W., крыніца

Гэта дзе вы хочаце Pipeline. Я даволі шмат скапіяваны код з патэрнаў паралельнага праграмавання, і ўвёў дадатковы паралелізм на этапе 2 (я ўключыў прыклады, выкарыстоўваючы як паралельныя задачы і PLINQ). Гэта не занадта складана, гэта працуе, і на маёй скрынцы ён працуе ў шмат разоў хутчэй, чым паслядоўная версія. Вы не можаце бачыць тую ж ступень паляпшэння ў кодзе (таму што я мяркую, што ваш Працэс з'яўляецца трохі больш складаным, чым Thread.Sleep ), але гэта ўсё роўна будзе працаваць хутчэй.

Відавочна, што існуе шмат перашкод з-за дадатковую раўналежнасць і я спрабую, каб адпавядаць вашай аб'ектнай мадэлі. Звярніцеся да старонкі 55 Мадэлі паралельнага праграмавання для арыгінала, без праблем ўзору кода. Гэта рэч прыгажосці, так што не забудзьцеся праверыць яго ( HTTP : //www.microsoft.com/en-au/download/details.aspx ID = 19222 ).

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace PipelineExample
{
    /// 
/// Stack Overflow question 16882318. ///
 
    public class Program
    {
        /// 
/// This is our simulated "file". In essense it will contain the /// ID of each Frame which has been processed and written to file. ///
  
        private static readonly List FrameFile = new List();

        /// 
/// This is a modification of Stephen Toub's Pipelines /// example from Patterns Of Parallel Programming. ///
 
        private static void RunPipeline(VReader vreader, long totalframes)
        {
            var rawFrames = new BlockingCollection();
            var processedFrames = new BlockingCollection();

           //Stage 1: read raw frames.
            var readTask = Task.Run(() =>
            {
                try
                {
                    for (long n = 0; n < totalframes; n++)
                    {
                        rawFrames.Add(vreader.ReadVideoFrame());
                    }
                }
                finally { rawFrames.CompleteAdding(); }
            });

           //Stage 2: process frames in parallel.
            var processTask = Task.Run(() =>
            {
                try
                {
                   //Try both - see which performs better in your scenario.
                    Step2WithParallelTasks(rawFrames, processedFrames);
                    //Step2WithPLinq(rawFrames, processedFrames);
                }
                finally { processedFrames.CompleteAdding(); }
            });

           //Stage 3: write results to file and dispose of the frame.
            var writeTask = Task.Run(() =>
            {
                foreach (var processedFrame in processedFrames.GetConsumingEnumerable())
                {
                    WriteToFile(processedFrame);
                    processedFrame.Dispose();
                }
            });

            Task.WaitAll(readTask, processTask, writeTask);
        }

        /// 
/// Processes frames in rawFrames and adds them to /// processedFrames preserving the original frame order. ///
 
        private static void Step2WithPLinq(BlockingCollection rawFrames, BlockingCollection processedFrames)
        {
            Console.WriteLine("Executing Step 2 via PLinq.");

            var processed = rawFrames.GetConsumingEnumerable()
                .AsParallel()
                .AsOrdered()
                .Select(frame =>
                {
                    Process(frame);
                    return frame;
                });

            foreach (var frame in processed)
            {
                processedFrames.Add(frame);
            }
        }

        /// 
/// Processes frames in rawFrames and adds them to /// processedFrames preserving the original frame order. ///
 
        private static void Step2WithParallelTasks(BlockingCollection rawFrames, BlockingCollection processedFrames)
        {
            Console.WriteLine("Executing Step 2 via parallel tasks.");

            var degreesOfParallellism = Environment.ProcessorCount;
            var inbox = rawFrames.GetConsumingEnumerable();

           //Start our parallel tasks.
            while (true)
            {
                var tasks = inbox
                    .Take(degreesOfParallellism)
                    .Select(frame => Task.Run(() =>
                    {
                        Process(frame);
                        return frame;
                    }))
                    .ToArray();

                if (tasks.Length == 0)
                {
                    break;
                }

                Task.WaitAll(tasks);

                foreach (var t in tasks)
                {
                    processedFrames.Add(t.Result);
                }
            }
        }

        /// 
/// Sequential implementation - as is (for comparison). ///
 
        private static void RunSequential(VReader vreader, long totalframes)
        {
            for (long n = 0; n < totalframes; n++)
            {
                using (var frame = vreader.ReadVideoFrame())
                {
                    Process(frame);
                    WriteToFile(frame);
                }
            }
        }

        /// 
/// Main entry point. ///
 
        private static void Main(string[] args)
        {
           //Arguments.
            long totalframes = 1000;
            var vreader = new VReader();

           //We'll time our run.
            var sw = Stopwatch.StartNew();

           //Try both for comparison.
            //RunSequential(vreader, totalframes);
            RunPipeline(vreader, totalframes);

            sw.Stop();

            Console.WriteLine("Elapsed ms: {0}.", sw.ElapsedMilliseconds);

           //Validation: count, order and contents.
            if (Range(1, totalframes).SequenceEqual(FrameFile))
            {
                Console.WriteLine("Frame count and order of frames in the file are CORRECT.");
            }
            else
            {
                Console.WriteLine("Frame count and order of frames in the file are INCORRECT.");
            }

            Console.ReadLine();
        }

        /// 
/// Simulate CPU work. ///
 
        private static void Process(Bitmap frame)
        {
            Thread.Sleep(10);
        }

        /// 
/// Simulate IO pressure. ///
 
        private static void WriteToFile(Bitmap frame)
        {
            Thread.Sleep(5);
            FrameFile.Add(frame.ID);
        }

        /// 
/// Naive implementation of Enumerable.Range(int, int) for long. ///
 
        private static IEnumerable Range(long start, long count)
        {
            for (long i = start; i < start + count; i++)
            {
                yield return i;
            }
        }

        private class VReader
        {
            public Bitmap ReadVideoFrame()
            {
                return new Bitmap();
            }
        }

        private class Bitmap : IDisposable
        {
            private static int MaxID;
            public readonly long ID;

            public Bitmap()
            {
                this.ID = Interlocked.Increment(ref MaxID);
            }

            public void Dispose()
            {
               //Dummy method.
            }
        }
    }
}
3
дададзена

Для працы на элементах паралельна, выкарыстоўвайце System.Linq 'ы паралельныя метады, такія як ParallelEnumerable.Range() . Каб захаваць элементы ў парадку, вы можаце выкарыстоўваць .AsOrdered() .

ParallelEnumerable.Range(0, totalframes)
                  .AsOrdered()
                  .Select(x => vreader.ReadVideoFrame())
                  .Select(Process)
                  .Select(WriteToFile);
2
дададзена
@Henk: Тэорыя заключаецца ў тым, што ёсць буфер паміж кожнай стадыяй. Гэтыя кадры будуць прачытаныя і забуференными, накіроўваецца на стадыю апрацоўкі, з тым, забуференным там, а затым перайшлі да стадыі напісання ў парадку.
дададзена аўтар Gabe, крыніца
@Kirill: Я не магу сабе ўявіць, які відэа файл можа мець больш кадраў, чым упісацца ў Int . У 60fps, 2 ^ 31 FAMEs больш і год відэа!
дададзена аўтар Gabe, крыніца
Гэта элегантнае, але я не ведаю, як менавіта гэта будзе выконвацца.
дададзена аўтар Henk Holterman, крыніца
ParallelEnumerable.Range з'яўляецца (INT, INT) метад. Арыгінальны пытанне выкарыстоўваецца доўгі . Акрамя таго, ваш Select (Process) лінія азначае, што працэс вяртае Bitmap, які можа ці не можа быць так. У адваротным выпадку гэта выдатны адказ.
дададзена аўтар Kirill Shlenskiy, крыніца
Кірмаш кропка. Я думаю, што я змяню свой адказ, а таксама і пазбавіцца ад гэтага доўгай уродства.
дададзена аўтар Kirill Shlenskiy, крыніца

Можа быць, 4 BackgroundWorker 's. Праходзяць лік ад 1-4 да кожнага, акрамя саміх дадзеных - і ў іх RunWorkerCompleted апрацоўшчык падзеі - праверце, калі ўсе астатнія 3 скончылі ... (Вы можаце выкарыстоўваць Bool [4] для гэтага.)

Наколькі я ведаю - вам не прыйдзецца турбавацца аб 2 RunWorkerCompleted 's выклікаецца ў той жа самы час, таму што яны ўсе працуюць на адной і той жа тэме.

0
дададзена

Так - вам патрэбна ThreadPool, некаторыя тэмы, клас для ўваходных дадзеных малюнка + а «парадкавага нумар» або «нумары кадра», каб вызначыць парадак і струменева-клас «ReSerializer», які мае кантэйнер для кэшавання ўсіх кадраў, атрыманых «выйшаў з ладу» да больш раннія кадры прыходзяць.

0
дададзена

У мяне была аналагічная праблема, якую я спытаў аб ў гэтай тэме .

Я прыдумаў рашэнне, якое, здаецца, працуе добра, але гэта можа здацца занадта складаным для вашых мэтаў.

Яна круціцца вакол вас магчымасць паставіць 3 дэлегатаў: Адзін, каб атрымаць працоўны элемент (у вашым выпадку, ён будзе вяртаць Bitmap ), адзін для апрацоўкі, што працоўны элемент і канчатковым для высновы, што праца пункт. Яна таксама дазваляе паказаць максімальную колькасць адначасовых патокаў, якія будуць працуюць - вы можаце выкарыстоўваць гэта, каб абмежаваць выкарыстанне памяці. Глядзіце numTasks параметраў у ParallelBlockProcessor Канструктар ніжэй.

Толькі апрацоўка дэлегат выклікаецца некалькімі патокамі.

Як і вы, мне трэба, каб пераканацца, што канчатковы вынік быў напісаны ў тым жа парадку, што і першапачатковы ўваход. Я выкарыстаў чаргу прыярытэту для гэтага.

Там можа быць лепшыя рашэнні з выкарыстаннем .NET 4.5 ў TPL, але я быў абмежаваны .Net 4.

Вось код, які я прыдумаў, - я думаю, што вы маглі б адаптаваць яго да вашай праблеме:

Клас ParallelBlockProcessor:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using ConsoleApplication1;

namespace Demo
{
    public sealed class ParallelBlockProcessor where T: class
    {
        public delegate T Read();           //Called by only one thread.
        public delegate T Process(T block); //Called simultaneously by multiple threads.
        public delegate void Write(T block);//Called by only one thread.

        public ParallelBlockProcessor(Read read, Process process, Write write, int numTasks = 0)
        {
            Contract.Requires(read != null);
            Contract.Requires(process != null);
            Contract.Requires(write != null);
            Contract.Requires((0 <= numTasks) && (numTasks <= 64));

            _read    = read;
            _process = process;
            _write   = write;

            numTasks = (numTasks > 0) ? numTasks : Environment.ProcessorCount;

            _workPool   = new BlockingCollection(numTasks*2);
            _inputQueue  = new BlockingCollection(numTasks);
            _outputQueue = new ConcurrentPriorityQueue();
            _processors  = new Task[numTasks];

            initWorkItems();
            startProcessors();
            Task.Factory.StartNew(enqueueBlocks);
            _dequeuer = Task.Factory.StartNew(dequeueBlocks);
        }

        private void startProcessors()
        {
            for (int i = 0; i < _processors.Length; ++i)
            {
                _processors[i] = Task.Factory.StartNew(processBlocks);
            }
        }

        private void initWorkItems()
        {
            for (int i = 0; i < _workPool.BoundedCapacity; ++i)
            {
                _workPool.Add(new WorkItem());
            }
        }

        private void enqueueBlocks()
        {
            int index = 0;

            while (true)
            {
                T data = _read();

                if (data == null)
                {
                    _inputQueue.CompleteAdding();
                    _outputQueue.Enqueue(index, null);//Special terminator WorkItem.
                    break;
                }

                WorkItem workItem = _workPool.Take();
                workItem.Data = data;
                workItem.Index = index++;

                _inputQueue.Add(workItem);
            }
        }

        private void dequeueBlocks()
        {
            int index = 0;//Next required index.
            int last = int.MaxValue;

            while (true)
            {
                KeyValuePair workItem;
                _outputQueue.WaitForNewItem();  //There will always be at least one item - the sentinel item.

                while (_outputQueue.TryPeek(out workItem))
                {
                    if (workItem.Value == null)//The sentinel item has a null value to indicate that it's the sentinel.
                    {
                        last = workItem.Key; //The sentinel's key is the index of the last block + 1.
                    }
                    else if (workItem.Key == index)//Is this block the next one that we want?
                    {
                       //Even if new items are added to the queue while we're here, the new items will be lower priority.
                       //Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at.

                        _outputQueue.TryDequeue(out workItem);
                        Contract.Assume(workItem.Key == index);
                        _workPool.Add(new WorkItem());//Free up a work pool item.     
                        _write(workItem.Value);
                        ++index;
                    }
                    else//If it's not the block we want, we know we'll get a new item at some point.
                    {
                        _outputQueue.WaitForNewItem();
                    }

                    if (index == last)
                    {
                        return;
                    }
                }
            }
        }

        private void processBlocks()
        {
            foreach (var block in _inputQueue.GetConsumingEnumerable())
            {
                var processedData = _process(block.Data);
                _outputQueue.Enqueue(block.Index, processedData);
            }
        }

        public bool WaitForFinished(int maxMillisecondsToWait)//Can be Timeout.Infinite.
        {
            return _dequeuer.Wait(maxMillisecondsToWait);
        }

        private sealed class WorkItem//Note: This is mutable.
        {
            public T   Data  { get; set; }
            public int Index { get; set; }
        }

        private readonly Task[] _processors;

        private readonly Task _dequeuer;

        private readonly BlockingCollection _workPool;
        private readonly BlockingCollection _inputQueue;
        private readonly ConcurrentPriorityQueue _outputQueue;

        private readonly Read    _read;
        private readonly Process _process;
        private readonly Write   _write;
    }
}

Чарга Priority (адаптавана з адной кампаніі Microsoft):

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;

namespace ConsoleApplication1
{
    /// 
Provides a thread-safe priority queue data structure.
 
    /// Specifies the type of keys used to prioritize values. 
    /// Specifies the type of elements in the queue. 

    [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")]
    [SuppressMessage("Microsoft.Naming", "CA1710:IdentifiersShouldHaveCorrectSuffix")]
    [DebuggerDisplay("Count={Count}")] 

    public sealed class ConcurrentPriorityQueue : 
        IProducerConsumerCollection>  
        where TKey : IComparable 
    { 
        /// 
Initializes a new instance of the ConcurrentPriorityQueue class.
 
        public ConcurrentPriorityQueue() {} 

        /// 
Initializes a new instance of the ConcurrentPriorityQueue class that contains elements copied from the specified collection.
 
        /// 
The collection whose elements are copied to the new ConcurrentPriorityQueue. 

        [SuppressMessage("Microsoft.Design", "CA1006:DoNotNestGenericTypesInMemberSignatures")]

        public ConcurrentPriorityQueue(IEnumerable> collection) 
        { 
            if (collection == null) throw new ArgumentNullException("collection"); 
            foreach (var item in collection) _minHeap.Insert(item); 
        } 

        /// 
Adds the key/value pair to the priority queue.
 
        /// 
The priority of the item to be added. 
        /// 
The item to be added. 
        public void Enqueue(TKey priority, TValue value) 
        { 
            Enqueue(new KeyValuePair(priority, value)); 
        } 

        /// 
Adds the key/value pair to the priority queue.
 
        /// 
The key/value pair to be added to the queue. 
        public void Enqueue(KeyValuePair item) 
        {
            lock (_syncLock)
            {
                _minHeap.Insert(item);
                _newItem.Set();
            }
        }

        /// 
Waits for a new item to appear.
        public void WaitForNewItem()
        {
            _newItem.WaitOne();
        }

        /// 
Attempts to remove and return the next prioritized item in the queue.
 
        /// 
 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        ///  
        ///  
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        ///  
        public bool TryDequeue(out KeyValuePair result) 
        { 
            result = default(KeyValuePair); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Remove(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// 
Attempts to return the next prioritized item in the queue.
 
        /// 
 
        /// When this method returns, if the operation was successful, result contains the object. 
        /// The queue was not modified by the operation. 
        ///  
        ///  
        /// true if an element was returned from the queue succesfully; otherwise, false. 
        ///  
        public bool TryPeek(out KeyValuePair result) 
        { 
            result = default(KeyValuePair); 
            lock (_syncLock) 
            { 
                if (_minHeap.Count > 0) 
                { 
                    result = _minHeap.Peek(); 
                    return true; 
                } 
            } 
            return false; 
        } 

        /// 
Empties the queue.
 
        public void Clear() { lock(_syncLock) _minHeap.Clear(); } 

        /// 
Gets whether the queue is empty.
 
        public bool IsEmpty { get { return Count == 0; } } 

        /// 
Gets the number of elements contained in the queue.
 
        public int Count 
        { 
            get { lock (_syncLock) return _minHeap.Count; } 
        } 

        /// 
Copies the elements of the collection to an array, starting at a particular array index.
 
        /// 
 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        ///  
        /// 
 
        /// The zero-based index in array at which copying begins. 
        ///  
        /// The elements will not be copied to the array in any guaranteed order. 
        public void CopyTo(KeyValuePair[] array, int index) 
        { 
            lock (_syncLock) _minHeap.Items.CopyTo(array, index); 
        } 

        /// 
Copies the elements stored in the queue to a new array.
 
        /// A new array containing a snapshot of elements copied from the queue. 
        public KeyValuePair[] ToArray() 
        { 
            lock (_syncLock) 
            { 
                var clonedHeap = new MinBinaryHeap(_minHeap); 
                var result = new KeyValuePair[_minHeap.Count]; 
                for (int i = 0; i < result.Length; i++) 
                { 
                    result[i] = clonedHeap.Remove(); 
                } 
                return result; 
            } 
        } 

        /// 
Attempts to add an item in the queue.
 
        /// 
The key/value pair to be added. 
        ///  
        /// true if the pair was added; otherwise, false. 
        ///  
        bool IProducerConsumerCollection>.TryAdd(KeyValuePair item) 
        { 
            Enqueue(item); 
            return true; 
        } 

        /// 
Attempts to remove and return the next prioritized item in the queue.
 
        /// 
 
        /// When this method returns, if the operation was successful, result contains the object removed. If 
        /// no object was available to be removed, the value is unspecified. 
        ///  
        ///  
        /// true if an element was removed and returned from the queue succesfully; otherwise, false. 
        ///  
        bool IProducerConsumerCollection>.TryTake(out KeyValuePair item) 
        { 
            return TryDequeue(out item); 
        } 

        /// 
Returns an enumerator that iterates through the collection.
 
        /// An enumerator for the contents of the queue. 
        ///  
        /// The enumeration represents a moment-in-time snapshot of the contents of the queue. It does not 
        /// reflect any updates to the collection after GetEnumerator was called. The enumerator is safe to 
        /// use concurrently with reads from and writes to the queue. 
        ///  
        public IEnumerator> GetEnumerator() 
        { 
            var arr = ToArray(); 
            return ((IEnumerable>)arr).GetEnumerator(); 
        } 

        /// 
Returns an enumerator that iterates through a collection.
 
        /// An IEnumerator that can be used to iterate through the collection. 
        IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } 

        /// 
Copies the elements of the collection to an array, starting at a particular array index.
 
        /// 
 
        /// The one-dimensional array that is the destination of the elements copied from the queue. 
        ///  
        /// 
 
        /// The zero-based index in array at which copying begins. 
        ///  
        void ICollection.CopyTo(Array array, int index) 
        { 
            lock (_syncLock) ((ICollection)_minHeap.Items).CopyTo(array, index); 
        } 

        /// 
/// Gets a value indicating whether access to the ICollection is synchronized with the SyncRoot. ///
  
        bool ICollection.IsSynchronized { get { return true; } } 

        /// 
/// Gets an object that can be used to synchronize access to the collection. ///
  
        object ICollection.SyncRoot { get { return _syncLock; } } 

        /// 
Implements a binary heap that prioritizes smaller values.
 
        private sealed class MinBinaryHeap 
        { 
            private readonly List> _items; 

            /// 
Initializes an empty heap.
 
            public MinBinaryHeap() 
            { 
                _items = new List>(); 
            } 

            /// 
Initializes a heap as a copy of another heap instance.
 
            /// 
The heap to copy. 
            /// Key/Value values are not deep cloned. 
            public MinBinaryHeap(MinBinaryHeap heapToCopy) 
            { 
                _items = new List>(heapToCopy.Items); 
            } 

            /// 
Empties the heap.
 
            public void Clear() { _items.Clear(); } 

            /// 
Adds an item to the heap.
 
            public void Insert(KeyValuePair entry) 
            { 
               //Add the item to the list, making sure to keep track of where it was added. 
                _items.Add(entry); 
                int pos = _items.Count - 1; 

               //If the new item is the only item, we're done. 
                if (pos == 0) return; 

               //Otherwise, perform log(n) operations, walking up the tree, swapping 
               //where necessary based on key values 
                while (pos > 0) 
                { 
                   //Get the next position to check 
                    int nextPos = (pos-1)/2; 

                   //Extract the entry at the next position 
                    var toCheck = _items[nextPos]; 

                   //Compare that entry to our new one.  If our entry has a smaller key, move it up. 
                   //Otherwise, we're done. 
                    if (entry.Key.CompareTo(toCheck.Key) < 0) 
                    { 
                        _items[pos] = toCheck; 
                        pos = nextPos; 
                    } 
                    else break; 
                } 

               //Make sure we put this entry back in, just in case 
                _items[pos] = entry; 
            } 

            /// 
Returns the entry at the top of the heap.
 
            public KeyValuePair Peek() 
            { 
               //Returns the first item 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                return _items[0]; 
            } 

            /// 
Removes the entry at the top of the heap.
 
            public KeyValuePair Remove() 
            { 
               //Get the first item and save it for later (this is what will be returned). 
                if (_items.Count == 0) throw new InvalidOperationException("The heap is empty."); 
                KeyValuePair toReturn = _items[0]; 

               //Remove the first item if there will only be 0 or 1 items left after doing so.   
                if (_items.Count <= 2) _items.RemoveAt(0); 
               //A reheapify will be required for the removal 
                else 
                { 
                   //Remove the first item and move the last item to the front. 
                    _items[0] = _items[_items.Count - 1]; 
                    _items.RemoveAt(_items.Count - 1); 

                   //Start reheapify 
                    int current = 0, possibleSwap = 0; 

                   //Keep going until the tree is a heap 
                    while (true) 
                    { 
                       //Get the positions of the node's children 
                        int leftChildPos = 2 * current + 1; 
                        int rightChildPos = leftChildPos + 1; 

                       //Should we swap with the left child? 
                        if (leftChildPos < _items.Count) 
                        { 
                           //Get the two entries to compare (node and its left child) 
                            var entry1 = _items[current]; 
                            var entry2 = _items[leftChildPos]; 

                           //If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = leftChildPos; 
                        } 
                        else break;//if can't swap this, we're done 

                       //Should we swap with the right child?  Note that now we check with the possible swap 
                       //position (which might be current and might be left child). 
                        if (rightChildPos < _items.Count) 
                        { 
                           //Get the two entries to compare (node and its left child) 
                            var entry1 = _items[possibleSwap]; 
                            var entry2 = _items[rightChildPos]; 

                           //If the child has a lower key than the parent, set that as a possible swap 
                            if (entry2.Key.CompareTo(entry1.Key) < 0) possibleSwap = rightChildPos; 
                        } 

                       //Now swap current and possible swap if necessary 
                        if (current != possibleSwap) 
                        { 
                            var temp = _items[current]; 
                            _items[current] = _items[possibleSwap]; 
                            _items[possibleSwap] = temp; 
                        } 
                        else break;//if nothing to swap, we're done 

                       //Update current to the location of the swap 
                        current = possibleSwap; 
                    } 
                } 

               //Return the item from the heap 
                return toReturn; 
            } 

            /// 
Gets the number of objects stored in the heap.
 
            public int Count { get { return _items.Count; } } 

            internal List> Items { get { return _items; } } 
        }

        private readonly AutoResetEvent _newItem = new AutoResetEvent(false);
        private readonly object _syncLock = new object();
        private readonly MinBinaryHeap _minHeap = new MinBinaryHeap();
    } 
}

Праграма выпрабаванняў:

using System;
using System.Diagnostics;
using System.Threading;

namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _rng = new Random(34324);

            int threadCount = 8;
            int maxBlocks = 200;
            ThreadPool.SetMinThreads(threadCount + 2, 4); //Kludge!

            var stopwatch = new Stopwatch();

            _numBlocks = maxBlocks;
            stopwatch.Restart();
            var processor = new ParallelBlockProcessor(read, process, write, threadCount);
            processor.WaitForFinished(Timeout.Infinite);

            Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n");
        }

        private static byte[] read()
        {
            if (_numBlocks-- == 0)
            {
                return null;
            }

            var result = new byte[128];
            result[0] = (byte)_numBlocks;
            Console.WriteLine("Supplied input: " + _numBlocks);
            return result;
        }

        private static byte[] process(byte[] data)
        {
            if (data[0] == 190)/*!*/
            {
                Thread.Sleep(5000);
            }

            Thread.Sleep(10 + _rng.Next(50));
            Console.WriteLine("Processed: " + data[0]);
            return data;
        }

        private static void write(byte[] data)
        {
            Console.WriteLine("Received output: " + data[0]);
        }

        private static Random _rng;
        private static int _numBlocks;
    }
}
0
дададзена

Вы можаце выкарыстоўваць CountdownEVent, які дазваляе чакаць больш аднаго патоку.

прыклад:     статычная CountdownEvent _countdown = новы CountdownEvent (3);

static void Main()
{
  new Thread (SaySomething).Start ("I am thread 1");
  new Thread (SaySomething).Start ("I am thread 2");
  new Thread (SaySomething).Start ("I am thread 3");

  _countdown.Wait();  //Blocks until Signal has been called 3 times
  Console.WriteLine ("All threads have finished speaking!");
}

static void SaySomething (object thing)
{
  Thread.Sleep (1000);
  Console.WriteLine (thing);
  _countdown.Signal();
}

Гэты Кодэкс не гарантуе, што патокі 1-3 будуць выконвацца ў тым парадку, аднак, калі вы выклікаеце метад сігналу першым, я лічу, што павінна вырашыць

Яшчэ больш эфектыўны падыходам будзе выглядаць рэалізаваць Monitor.Pulse() і Monitor.Wait() механізм, вы маглі б выкарыстоўваць гэта ў злучэнні з Thread.Sleep ў тэорыі, каб змясціць нітка спаць, калі ён завяршыў выкананне крытычнай секцыі у вашым выпадку кадр. пасля таго, як адзін паток скончыла апрацоўвае кадр пакласці, што нітка для сну і пульсаваць які чакае паток рабіць гэта бесперапынна да таго часу, пакуль усе кадры будуць завершаны, а затым прачынаюцца ніткі .... Нітка складаная, бо іх цяжка ведаць, калі яны скончаць выкананне.

0
дададзена