pl-nk v0.4.5
Plonk|Plink|Plank are a set of cross-platform C/C++ frameworks for audio software development
plonk_TaskChannel.h
00001 /*
00002  -------------------------------------------------------------------------------
00003  This file is part of the Plink, Plonk, Plank libraries
00004   by Martin Robinson
00005  
00006  http://code.google.com/p/pl-nk/
00007  
00008  Copyright University of the West of England, Bristol 2011-14
00009  All rights reserved.
00010  
00011  Redistribution and use in source and binary forms, with or without
00012  modification, are permitted provided that the following conditions are met:
00013  
00014  * Redistributions of source code must retain the above copyright
00015    notice, this list of conditions and the following disclaimer.
00016  * Redistributions in binary form must reproduce the above copyright
00017    notice, this list of conditions and the following disclaimer in the
00018    documentation and/or other materials provided with the distribution.
00019  * Neither the name of University of the West of England, Bristol nor 
00020    the names of its contributors may be used to endorse or promote products
00021    derived from this software without specific prior written permission.
00022  
00023  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
00024  ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00025  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 
00026  DISCLAIMED. IN NO EVENT SHALL UNIVERSITY OF THE WEST OF ENGLAND, BRISTOL BE 
00027  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
00028  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 
00029  GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
00030  HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
00031  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 
00032  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
00033  
00034  This software makes use of third party libraries. For more information see:
00035  doc/license.txt included in the distribution.
00036  -------------------------------------------------------------------------------
00037  */
00038 
00039 #ifndef PLONK_THREADEDCHANNEL_H
00040 #define PLONK_THREADEDCHANNEL_H
00041 
00042 #include "../channel/plonk_ChannelInternalCore.h"
00043 #include "../plonk_GraphForwardDeclarations.h"
00044 
00045 template<class SampleType, Interp::TypeCode InterpTypeCode> class InputTaskChannelInternal;
00046 
00047 template<class SampleType,Interp::TypeCode InterpTypeCode>
00048 struct ChannelData< InputTaskChannelInternal<SampleType,InterpTypeCode> >
00049 {
00050     ChannelInternalCore::Data base;
00051     int numChannels;
00052     int numBuffers;
00053     int priority;
00054     bool resampleInput;
00055 };      
00056 
00057 //------------------------------------------------------------------------------
00058 
00059 class TaskMessageInternal : public SmartPointer
00060 {
00061 public:
00062     TaskMessageInternal() throw()
00063     {
00064     }
00065     
00066     TaskMessageInternal (Text const& t)
00067     :   message (t)
00068     {
00069     }
00070     
00071     TaskMessageInternal (Text const& t, Dynamic const& d)
00072     :   message (t), payload (d)
00073     {
00074     }
00075     
00076     friend class TaskMessage;
00077     
00078 private:
00079     const Text message;
00080     const Dynamic payload;
00081 };
00082 
00083 class TaskMessage : public SmartPointerContainer<TaskMessageInternal>
00084 {
00085 public:
00086     TaskMessage() throw()
00087     :   SmartPointerContainer<TaskMessageInternal> (static_cast<TaskMessageInternal*> (0))
00088     {
00089     }
00090     
00091     TaskMessage (Text const& t)
00092     :   SmartPointerContainer<TaskMessageInternal> (new TaskMessageInternal (t, Dynamic::getNull()))
00093     {
00094     }
00095     
00096     TaskMessage (Text const& t, Dynamic const& d)
00097     :   SmartPointerContainer<TaskMessageInternal> (new TaskMessageInternal (t, d))
00098     {
00099     }
00100     
00101     inline const Text& getMessage() const throw()    { return this->getInternal()->message; }
00102     inline const Dynamic& getPayload() const throw() { return this->getInternal()->payload; }
00103 };
00104 
00105 template<class SampleType>
00106 class TaskBufferInternal : public SmartPointer
00107 {
00108 public:
00109     typedef NumericalArray<SampleType> Buffer;
00110     typedef LockFreeQueue<TaskMessage> TaskMessages;
00111     
00112     TaskBufferInternal (const int size) throw()
00113     :   buffer (Buffer::newClear (size))
00114     {
00115     }
00116     
00117     Buffer buffer;
00118     TaskMessages messages;
00119 };
00120 
00121 template<class SampleType>
00122 class TaskBufferBase : public SmartPointerContainer< TaskBufferInternal<SampleType> >
00123 {
00124 public:
00125     TaskBufferBase() throw()
00126     :   SmartPointerContainer< TaskBufferInternal<SampleType> > (static_cast< TaskBufferInternal<SampleType>* > (0))
00127     {
00128     }
00129     
00130     TaskBufferBase (const int size) throw()
00131     :   SmartPointerContainer< TaskBufferInternal<SampleType> > (new TaskBufferInternal<SampleType> (size))
00132     {
00133     }
00134     
00135     static const TaskBufferBase& getNull() throw()
00136     {
00137         static TaskBufferBase null;
00138         return null;
00139     }
00140 };
00141 
00142 
00144 template<class SampleType, Interp::TypeCode InterpTypeCode>
00145 class InputTaskChannelInternal
00146 //:   public ProxyOwnerChannelInternal<SampleType, PLONK_CHANNELDATA_NAME(InputTaskChannelInternal,SampleType)>
00147 :  public ProxyOwnerChannelInternal<SampleType, ChannelData< InputTaskChannelInternal<SampleType,InterpTypeCode> > >
00148 {
00149 public:
00150 //    typedef PLONK_CHANNELDATA_NAME(InputTaskChannelInternal,SampleType) Data;
00151    
00152     typedef InputTaskChannelInternal<SampleType,InterpTypeCode>         TaskInternal;
00153     typedef ChannelData<TaskInternal>                                   Data;
00154     typedef ChannelBase<SampleType>                                     ChannelType;
00155     typedef ObjectArray<ChannelType>                                    ChannelArrayType;
00156     typedef ProxyOwnerChannelInternal<SampleType,Data>                  Internal;
00157     typedef UnitBase<SampleType>                                        UnitType;
00158     typedef InputDictionary                                             Inputs;
00159     typedef NumericalArray<SampleType>                                  Buffer;
00160     typedef TaskBufferBase<SampleType>                                  TaskBuffer;
00161     typedef ResampleUnit<SampleType,InterpTypeCode>                     ResampleType;
00162 
00163     
00164     //--------------------------------------------------------------------------
00165     
00166     class InputTask :  public Threading::Thread, public Channel::Receiver
00167     {
00168     public:
00169         typedef LockFreeQueue<TaskBuffer> BufferQueue;
00170         
00171         InputTask (InputTaskChannelInternal* o) throw()
00172         :   Threading::Thread (Text ("InputTaskChannelInternal::Thread[" + Text (*(LongLong*)&o) + Text ("]"))),
00173             owner (o),
00174             info (owner->getProcessInfo()),
00175             event (Lock::MutexLock)
00176         {
00177         }
00178         
00179         void changed (ChannelType const& source, Text const& message, Dynamic const& payload) throw()
00180         {
00181             TaskMessage tm (message, payload);
00182             currentTaskBuffer.getInternal()->messages.push (tm);
00183         }
00184                 
00185         ResultCode run() throw()
00186         {
00187             Data& data = owner->getState();
00188             setPriority (data.priority);
00189             
00190             const int numChannels = owner->getNumChannels();
00191             const int numBuffers = owner->getState().numBuffers;;
00192                 
00193             plonk_assert (numBuffers > 0);
00194             plonk_assert (owner->getBlockSize().getValue() > 0);
00195             
00196             const int bufferSize = numChannels * owner->getBlockSize().getValue();
00197             
00198             for (int i = 0; i < numBuffers; ++i)
00199                 activeBuffers.push (TaskBuffer (bufferSize));
00200             
00201             Threading::yield();
00202             
00203             while (!getShouldExit())
00204             {                     
00205                 const int blockSize = owner->getBlockSize().getValue();
00206                 const int numFreeBuffers = freeBuffers.length();
00207                                 
00208                 if (numFreeBuffers > 0)
00209                 {                    
00210                     UnitType& inputUnit (owner->getInputAsUnit (IOKey::Generic));
00211                     plonk_assert (inputUnit.channelsHaveSameBlockSize());
00212                     
00213                     currentTaskBuffer = freeBuffers.pop(); // nothing else pops so must be still available
00214                     Buffer& buffer = currentTaskBuffer.getInternal()->buffer;
00215                     buffer.setSize (blockSize * numChannels, false);
00216                     
00217                     SampleType* bufferSamples = buffer.getArray();
00218 
00219                     for (int channel = 0; channel < numChannels; ++channel)
00220                     {
00221                         const Buffer& inputBuffer (inputUnit.process (info, channel));                    
00222                         const SampleType* inputSamples = inputBuffer.getArray();
00223                         const int inputBufferLength = inputBuffer.length();
00224                         
00225 //                        plonk_assert (buffer.length() == (numChannels * inputBufferLength));
00226                         
00227                         if (buffer.length() == (numChannels * inputBufferLength))
00228                         {
00229                             NumericalArray<SampleType>::copyData (bufferSamples, inputSamples, inputBufferLength);
00230                             bufferSamples += inputBufferLength;
00231                         }
00232                         else
00233                         {
00234                             // probably got deleted..
00235                             buffer.zero();
00236                             break;
00237                         }
00238                     }
00239                     
00240                     activeBuffers.push (currentTaskBuffer);
00241                     currentTaskBuffer = TaskBuffer::getNull();
00242 
00243                     plonk_assert (inputUnit.channelsHaveSameSampleRate());
00244                     info.offsetTimeStamp (owner->getSampleRate().getSampleDurationInTicks() * blockSize);
00245                 }
00246                 
00247                 event.wait();
00248             }
00249             
00250             freeBuffers.clearAll();
00251             activeBuffers.clearAll();
00252             
00253             return 0;
00254         }
00255                 
00256         void end() throw()
00257         {
00258             setShouldExit();
00259             event.signal();
00260     
00261             while (isRunning())
00262                 Threading::sleep (0.0001); // will block!
00263         }
00264     
00265         inline bool pop (TaskBuffer& buffer) throw()
00266         {
00267             return activeBuffers.pop (buffer);
00268         }
00269     
00270         inline void push (TaskBuffer const& buffer) throw()
00271         {
00272             buffer.getInternal()->messages.clear();
00273             freeBuffers.push (buffer);
00274             event.signal();
00275         }
00276     
00277     private:
00278         InputTaskChannelInternal* owner;
00279         ProcessInfo& info;
00280         RNG rng;
00281         BufferQueue activeBuffers;
00282         BufferQueue freeBuffers;
00283         TaskBuffer currentTaskBuffer;
00284         Lock event;
00285     };
00286     
00287     //--------------------------------------------------------------------------
00288     
00289     InputTaskChannelInternal (Inputs const& inputs,
00290                               Data const& data,
00291                               BlockSize const& blockSize,
00292                               SampleRate const& sampleRate,
00293                               ChannelArrayType& channels) throw()
00294     :   Internal (numChannelsInSource (inputs), 
00295                   inputs, data, blockSize, sampleRate,
00296                   channels),
00297         task (this)
00298     {
00299         UnitType& inputUnit (this->getInputAsUnit (IOKey::Generic));
00300         inputUnit.addReceiverToChannels (&task);
00301         
00302         if (data.resampleInput)
00303             inputUnit = ResampleType::ar (inputUnit, 1, blockSize, sampleRate);
00304         
00305         task.start();
00306     }
00307     
00308     ~InputTaskChannelInternal()
00309     {
00310         task.end();
00311         UnitType& inputUnit (this->getInputAsUnit (IOKey::Generic));
00312         inputUnit.removeReceiverFromChannels (&task);
00313     }
00314             
00315     Text getName() const throw()
00316     {
00317         return "Task";
00318     }       
00319     
00320     IntArray getInputKeys() const throw()
00321     {
00322         const IntArray keys (IOKey::Generic);
00323         return keys;
00324     }    
00325         
00326     void initChannel (const int channel) throw()
00327     {               
00328         this->initProxyValue (channel, 0);
00329     }    
00330     
00331     void process (ProcessInfo& info, const int /*channel*/) throw()
00332     {                
00333         const int numChannels = this->getNumChannels();
00334         
00335         TaskBuffer taskBuffer;
00336         
00337         if (task.pop (taskBuffer)) 
00338         {
00339             // could be smarter in here in case the buffer size changes
00340             
00341             Buffer& buffer = taskBuffer.getInternal()->buffer;
00342             SampleType* bufferSamples = buffer.getArray();
00343             
00344             for (int channel = 0; channel < numChannels; ++channel)
00345             {
00346                 Buffer& outputBuffer = this->getOutputBuffer (channel);
00347                 SampleType* const outputSamples = outputBuffer.getArray();
00348                 const int outputBufferLength = outputBuffer.length();  
00349                 
00350                 plonk_assert (buffer.length() == (numChannels * outputBufferLength));
00351                 
00352                 Buffer::copyData (outputSamples, bufferSamples, outputBufferLength);
00353                 bufferSamples += outputBufferLength;                
00354             }
00355             
00356             buffer.zero();
00357             
00358             TaskMessage taskMessage;
00359             while (taskBuffer.getInternal()->messages.pop (taskMessage))
00360                 this->update (taskMessage.getMessage(), taskMessage.getPayload());
00361             
00362             task.push (taskBuffer);
00363         }
00364         else
00365         {            
00366             // buffer underrun or other error
00367             for (int channel = 0; channel < numChannels; ++channel)
00368             {
00369                 Buffer& outputBuffer = this->getOutputBuffer (channel);
00370                 outputBuffer.zero();
00371             }
00372         }        
00373     }
00374     
00375     ProcessInfo& getProcessInfo() throw() { return info; }
00376     
00377 private:
00378     InputTask task;
00379     ProcessInfo info; // private info for this object as we're running out of sync with everything else
00380     
00381     static inline int numChannelsInSource (Inputs const& inputs) throw()
00382     {
00383         return inputs[IOKey::Generic].asUnchecked<UnitType>().getNumChannels();
00384     }
00385     
00386 };
00387 
00388 //------------------------------------------------------------------------------
00389 
00410 template<class SampleType, Interp::TypeCode InterpTypeCode>
00411 class InputTaskUnit
00412 {
00413 public:    
00414     typedef InputTaskChannelInternal<SampleType,InterpTypeCode>     TaskInternal;
00415     typedef typename TaskInternal::Data                             Data;
00416     typedef ChannelBase<SampleType>                                 ChannelType;
00417     typedef ChannelInternal<SampleType,Data>                        Internal;
00418     typedef UnitBase<SampleType>                                    UnitType;
00419     typedef InputDictionary                                         Inputs;
00420     typedef ResampleUnit<SampleType,InterpTypeCode>                 ResampleType;
00421     typedef InputTaskUnit<SampleType,Interp::Lagrange3>             HQ;
00422     
00423     static inline UnitInfos getInfo() throw()
00424     {
00425         const double blockSize = (double)BlockSize::getDefault().getValue();
00426         const double sampleRate = SampleRate::getDefault().getValue();
00427         
00428         return UnitInfo ("InputTask", "Defer a unit's processing to a separate task, thread, process or core.",
00429                          
00430                          // output
00431                          ChannelCount::VariableChannelCount, 
00432                          IOKey::Generic,            Measure::None,      0.0,        IOLimit::None,
00433                          IOKey::End,
00434                          
00435                          // inputs
00436                          IOKey::Generic,            Measure::None,
00437                          IOKey::BufferCount,        Measure::Count,     16.0,       IOLimit::Minimum,   Measure::Count,             1.0,
00438                          IOKey::Priority,           Measure::Percent,   50.0,       IOLimit::Clipped,   Measure::Percent,           0.0, 100.0,
00439                          IOKey::Multiply,           Measure::Factor,    1.0,        IOLimit::None,
00440                          IOKey::Add,                Measure::None,      0.0,        IOLimit::None,
00441                          IOKey::BlockSize,          Measure::Samples,   blockSize,  IOLimit::Minimum,   Measure::Samples,           1.0,
00442                          IOKey::SampleRate,         Measure::Hertz,     sampleRate, IOLimit::Minimum,   Measure::Hertz,             0.0,
00443                          IOKey::End);
00444     }
00445     
00446     static UnitType ar (UnitType const& input,
00447                         const int numBuffers = 16,
00448                         const int priority = 50,
00449                         BlockSize const& preferredBlockSize = BlockSize::noPreference(),
00450                         SampleRate const& preferredSampleRate = SampleRate::noPreference()) throw()
00451     {                     
00452         BlockSize blockSize = BlockSize::decide (input.getBlockSize (0), preferredBlockSize);
00453         SampleRate sampleRate = SampleRate::decide (input.getSampleRate (0), preferredSampleRate);
00454         
00455         // could avoid the resample if we added the function the check if all bs/sr are the same in each channel
00456         
00457         Inputs inputs;
00458 //        inputs.put (IOKey::Generic, ResampleType::ar (input, 1, blockSize, sampleRate));
00459         inputs.put (IOKey::Generic, input);
00460         
00461         Data data = { { -1.0, -1.0 }, 0, numBuffers, priority, true };
00462         
00463         return UnitType::template proxiesFromInputs<TaskInternal> (inputs, 
00464                                                                    data, 
00465                                                                    blockSize, 
00466                                                                    sampleRate);
00467     }
00468     
00469     static UnitType arNoResample (UnitType const& input,
00470                                   const int numBuffers = 16,
00471                                   const int priority = 50,
00472                                   BlockSize const& preferredBlockSize = BlockSize::noPreference(),
00473                                   SampleRate const& preferredSampleRate = SampleRate::noPreference()) throw()
00474     {
00475         BlockSize blockSize = BlockSize::decide (input.getBlockSize (0), preferredBlockSize);
00476         SampleRate sampleRate = SampleRate::decide (input.getSampleRate (0), preferredSampleRate);
00477         
00478         // could avoid the resample if we added the function the check if all bs/sr are the same in each channel
00479         
00480         Inputs inputs;
00481         inputs.put (IOKey::Generic, input);
00482         
00483         Data data = { { -1.0, -1.0 }, 0, numBuffers, priority, false };
00484         
00485         return UnitType::template proxiesFromInputs<TaskInternal> (inputs,
00486                                                                    data,
00487                                                                    blockSize,
00488                                                                    sampleRate);
00489     }
00490 
00491 };
00492 
00493 typedef InputTaskUnit<PLONK_TYPE_DEFAULT> InputTask;
00494 
00495 #endif // PLONK_THREADEDCHANNEL_H
00496 
 All Classes Functions Typedefs Enumerations Enumerator Properties