![]() |
pl-nk v0.4.5
Plonk|Plink|Plank are a set of cross-platform C/C++ frameworks for audio software development
|
00001 /* 00002 ------------------------------------------------------------------------------- 00003 This file is part of the Plink, Plonk, Plank libraries 00004 by Martin Robinson 00005 00006 http://code.google.com/p/pl-nk/ 00007 00008 Copyright University of the West of England, Bristol 2011-14 00009 All rights reserved. 00010 00011 Redistribution and use in source and binary forms, with or without 00012 modification, are permitted provided that the following conditions are met: 00013 00014 * Redistributions of source code must retain the above copyright 00015 notice, this list of conditions and the following disclaimer. 00016 * Redistributions in binary form must reproduce the above copyright 00017 notice, this list of conditions and the following disclaimer in the 00018 documentation and/or other materials provided with the distribution. 00019 * Neither the name of University of the West of England, Bristol nor 00020 the names of its contributors may be used to endorse or promote products 00021 derived from this software without specific prior written permission. 00022 00023 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 00024 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00025 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00026 DISCLAIMED. IN NO EVENT SHALL UNIVERSITY OF THE WEST OF ENGLAND, BRISTOL BE 00027 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00028 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 00029 GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 00030 HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00031 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 00032 OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00033 00034 This software makes use of third party libraries. For more information see: 00035 doc/license.txt included in the distribution. 00036 ------------------------------------------------------------------------------- 00037 */ 00038 00039 #ifndef PLONK_THREADEDCHANNEL_H 00040 #define PLONK_THREADEDCHANNEL_H 00041 00042 #include "../channel/plonk_ChannelInternalCore.h" 00043 #include "../plonk_GraphForwardDeclarations.h" 00044 00045 template<class SampleType, Interp::TypeCode InterpTypeCode> class InputTaskChannelInternal; 00046 00047 template<class SampleType,Interp::TypeCode InterpTypeCode> 00048 struct ChannelData< InputTaskChannelInternal<SampleType,InterpTypeCode> > 00049 { 00050 ChannelInternalCore::Data base; 00051 int numChannels; 00052 int numBuffers; 00053 int priority; 00054 bool resampleInput; 00055 }; 00056 00057 //------------------------------------------------------------------------------ 00058 00059 class TaskMessageInternal : public SmartPointer 00060 { 00061 public: 00062 TaskMessageInternal() throw() 00063 { 00064 } 00065 00066 TaskMessageInternal (Text const& t) 00067 : message (t) 00068 { 00069 } 00070 00071 TaskMessageInternal (Text const& t, Dynamic const& d) 00072 : message (t), payload (d) 00073 { 00074 } 00075 00076 friend class TaskMessage; 00077 00078 private: 00079 const Text message; 00080 const Dynamic payload; 00081 }; 00082 00083 class TaskMessage : public SmartPointerContainer<TaskMessageInternal> 00084 { 00085 public: 00086 TaskMessage() throw() 00087 : SmartPointerContainer<TaskMessageInternal> (static_cast<TaskMessageInternal*> (0)) 00088 { 00089 } 00090 00091 TaskMessage (Text const& t) 00092 : SmartPointerContainer<TaskMessageInternal> (new TaskMessageInternal (t, Dynamic::getNull())) 00093 { 00094 } 00095 00096 TaskMessage (Text const& t, Dynamic const& d) 00097 : SmartPointerContainer<TaskMessageInternal> (new TaskMessageInternal (t, d)) 00098 { 00099 } 00100 00101 inline const Text& getMessage() const throw() { return this->getInternal()->message; } 00102 inline const Dynamic& getPayload() const throw() { return this->getInternal()->payload; } 00103 }; 00104 00105 template<class SampleType> 00106 class TaskBufferInternal : public SmartPointer 00107 { 00108 public: 00109 typedef NumericalArray<SampleType> Buffer; 00110 typedef LockFreeQueue<TaskMessage> TaskMessages; 00111 00112 TaskBufferInternal (const int size) throw() 00113 : buffer (Buffer::newClear (size)) 00114 { 00115 } 00116 00117 Buffer buffer; 00118 TaskMessages messages; 00119 }; 00120 00121 template<class SampleType> 00122 class TaskBufferBase : public SmartPointerContainer< TaskBufferInternal<SampleType> > 00123 { 00124 public: 00125 TaskBufferBase() throw() 00126 : SmartPointerContainer< TaskBufferInternal<SampleType> > (static_cast< TaskBufferInternal<SampleType>* > (0)) 00127 { 00128 } 00129 00130 TaskBufferBase (const int size) throw() 00131 : SmartPointerContainer< TaskBufferInternal<SampleType> > (new TaskBufferInternal<SampleType> (size)) 00132 { 00133 } 00134 00135 static const TaskBufferBase& getNull() throw() 00136 { 00137 static TaskBufferBase null; 00138 return null; 00139 } 00140 }; 00141 00142 00144 template<class SampleType, Interp::TypeCode InterpTypeCode> 00145 class InputTaskChannelInternal 00146 //: public ProxyOwnerChannelInternal<SampleType, PLONK_CHANNELDATA_NAME(InputTaskChannelInternal,SampleType)> 00147 : public ProxyOwnerChannelInternal<SampleType, ChannelData< InputTaskChannelInternal<SampleType,InterpTypeCode> > > 00148 { 00149 public: 00150 // typedef PLONK_CHANNELDATA_NAME(InputTaskChannelInternal,SampleType) Data; 00151 00152 typedef InputTaskChannelInternal<SampleType,InterpTypeCode> TaskInternal; 00153 typedef ChannelData<TaskInternal> Data; 00154 typedef ChannelBase<SampleType> ChannelType; 00155 typedef ObjectArray<ChannelType> ChannelArrayType; 00156 typedef ProxyOwnerChannelInternal<SampleType,Data> Internal; 00157 typedef UnitBase<SampleType> UnitType; 00158 typedef InputDictionary Inputs; 00159 typedef NumericalArray<SampleType> Buffer; 00160 typedef TaskBufferBase<SampleType> TaskBuffer; 00161 typedef ResampleUnit<SampleType,InterpTypeCode> ResampleType; 00162 00163 00164 //-------------------------------------------------------------------------- 00165 00166 class InputTask : public Threading::Thread, public Channel::Receiver 00167 { 00168 public: 00169 typedef LockFreeQueue<TaskBuffer> BufferQueue; 00170 00171 InputTask (InputTaskChannelInternal* o) throw() 00172 : Threading::Thread (Text ("InputTaskChannelInternal::Thread[" + Text (*(LongLong*)&o) + Text ("]"))), 00173 owner (o), 00174 info (owner->getProcessInfo()), 00175 event (Lock::MutexLock) 00176 { 00177 } 00178 00179 void changed (ChannelType const& source, Text const& message, Dynamic const& payload) throw() 00180 { 00181 TaskMessage tm (message, payload); 00182 currentTaskBuffer.getInternal()->messages.push (tm); 00183 } 00184 00185 ResultCode run() throw() 00186 { 00187 Data& data = owner->getState(); 00188 setPriority (data.priority); 00189 00190 const int numChannels = owner->getNumChannels(); 00191 const int numBuffers = owner->getState().numBuffers;; 00192 00193 plonk_assert (numBuffers > 0); 00194 plonk_assert (owner->getBlockSize().getValue() > 0); 00195 00196 const int bufferSize = numChannels * owner->getBlockSize().getValue(); 00197 00198 for (int i = 0; i < numBuffers; ++i) 00199 activeBuffers.push (TaskBuffer (bufferSize)); 00200 00201 Threading::yield(); 00202 00203 while (!getShouldExit()) 00204 { 00205 const int blockSize = owner->getBlockSize().getValue(); 00206 const int numFreeBuffers = freeBuffers.length(); 00207 00208 if (numFreeBuffers > 0) 00209 { 00210 UnitType& inputUnit (owner->getInputAsUnit (IOKey::Generic)); 00211 plonk_assert (inputUnit.channelsHaveSameBlockSize()); 00212 00213 currentTaskBuffer = freeBuffers.pop(); // nothing else pops so must be still available 00214 Buffer& buffer = currentTaskBuffer.getInternal()->buffer; 00215 buffer.setSize (blockSize * numChannels, false); 00216 00217 SampleType* bufferSamples = buffer.getArray(); 00218 00219 for (int channel = 0; channel < numChannels; ++channel) 00220 { 00221 const Buffer& inputBuffer (inputUnit.process (info, channel)); 00222 const SampleType* inputSamples = inputBuffer.getArray(); 00223 const int inputBufferLength = inputBuffer.length(); 00224 00225 // plonk_assert (buffer.length() == (numChannels * inputBufferLength)); 00226 00227 if (buffer.length() == (numChannels * inputBufferLength)) 00228 { 00229 NumericalArray<SampleType>::copyData (bufferSamples, inputSamples, inputBufferLength); 00230 bufferSamples += inputBufferLength; 00231 } 00232 else 00233 { 00234 // probably got deleted.. 00235 buffer.zero(); 00236 break; 00237 } 00238 } 00239 00240 activeBuffers.push (currentTaskBuffer); 00241 currentTaskBuffer = TaskBuffer::getNull(); 00242 00243 plonk_assert (inputUnit.channelsHaveSameSampleRate()); 00244 info.offsetTimeStamp (owner->getSampleRate().getSampleDurationInTicks() * blockSize); 00245 } 00246 00247 event.wait(); 00248 } 00249 00250 freeBuffers.clearAll(); 00251 activeBuffers.clearAll(); 00252 00253 return 0; 00254 } 00255 00256 void end() throw() 00257 { 00258 setShouldExit(); 00259 event.signal(); 00260 00261 while (isRunning()) 00262 Threading::sleep (0.0001); // will block! 00263 } 00264 00265 inline bool pop (TaskBuffer& buffer) throw() 00266 { 00267 return activeBuffers.pop (buffer); 00268 } 00269 00270 inline void push (TaskBuffer const& buffer) throw() 00271 { 00272 buffer.getInternal()->messages.clear(); 00273 freeBuffers.push (buffer); 00274 event.signal(); 00275 } 00276 00277 private: 00278 InputTaskChannelInternal* owner; 00279 ProcessInfo& info; 00280 RNG rng; 00281 BufferQueue activeBuffers; 00282 BufferQueue freeBuffers; 00283 TaskBuffer currentTaskBuffer; 00284 Lock event; 00285 }; 00286 00287 //-------------------------------------------------------------------------- 00288 00289 InputTaskChannelInternal (Inputs const& inputs, 00290 Data const& data, 00291 BlockSize const& blockSize, 00292 SampleRate const& sampleRate, 00293 ChannelArrayType& channels) throw() 00294 : Internal (numChannelsInSource (inputs), 00295 inputs, data, blockSize, sampleRate, 00296 channels), 00297 task (this) 00298 { 00299 UnitType& inputUnit (this->getInputAsUnit (IOKey::Generic)); 00300 inputUnit.addReceiverToChannels (&task); 00301 00302 if (data.resampleInput) 00303 inputUnit = ResampleType::ar (inputUnit, 1, blockSize, sampleRate); 00304 00305 task.start(); 00306 } 00307 00308 ~InputTaskChannelInternal() 00309 { 00310 task.end(); 00311 UnitType& inputUnit (this->getInputAsUnit (IOKey::Generic)); 00312 inputUnit.removeReceiverFromChannels (&task); 00313 } 00314 00315 Text getName() const throw() 00316 { 00317 return "Task"; 00318 } 00319 00320 IntArray getInputKeys() const throw() 00321 { 00322 const IntArray keys (IOKey::Generic); 00323 return keys; 00324 } 00325 00326 void initChannel (const int channel) throw() 00327 { 00328 this->initProxyValue (channel, 0); 00329 } 00330 00331 void process (ProcessInfo& info, const int /*channel*/) throw() 00332 { 00333 const int numChannels = this->getNumChannels(); 00334 00335 TaskBuffer taskBuffer; 00336 00337 if (task.pop (taskBuffer)) 00338 { 00339 // could be smarter in here in case the buffer size changes 00340 00341 Buffer& buffer = taskBuffer.getInternal()->buffer; 00342 SampleType* bufferSamples = buffer.getArray(); 00343 00344 for (int channel = 0; channel < numChannels; ++channel) 00345 { 00346 Buffer& outputBuffer = this->getOutputBuffer (channel); 00347 SampleType* const outputSamples = outputBuffer.getArray(); 00348 const int outputBufferLength = outputBuffer.length(); 00349 00350 plonk_assert (buffer.length() == (numChannels * outputBufferLength)); 00351 00352 Buffer::copyData (outputSamples, bufferSamples, outputBufferLength); 00353 bufferSamples += outputBufferLength; 00354 } 00355 00356 buffer.zero(); 00357 00358 TaskMessage taskMessage; 00359 while (taskBuffer.getInternal()->messages.pop (taskMessage)) 00360 this->update (taskMessage.getMessage(), taskMessage.getPayload()); 00361 00362 task.push (taskBuffer); 00363 } 00364 else 00365 { 00366 // buffer underrun or other error 00367 for (int channel = 0; channel < numChannels; ++channel) 00368 { 00369 Buffer& outputBuffer = this->getOutputBuffer (channel); 00370 outputBuffer.zero(); 00371 } 00372 } 00373 } 00374 00375 ProcessInfo& getProcessInfo() throw() { return info; } 00376 00377 private: 00378 InputTask task; 00379 ProcessInfo info; // private info for this object as we're running out of sync with everything else 00380 00381 static inline int numChannelsInSource (Inputs const& inputs) throw() 00382 { 00383 return inputs[IOKey::Generic].asUnchecked<UnitType>().getNumChannels(); 00384 } 00385 00386 }; 00387 00388 //------------------------------------------------------------------------------ 00389 00410 template<class SampleType, Interp::TypeCode InterpTypeCode> 00411 class InputTaskUnit 00412 { 00413 public: 00414 typedef InputTaskChannelInternal<SampleType,InterpTypeCode> TaskInternal; 00415 typedef typename TaskInternal::Data Data; 00416 typedef ChannelBase<SampleType> ChannelType; 00417 typedef ChannelInternal<SampleType,Data> Internal; 00418 typedef UnitBase<SampleType> UnitType; 00419 typedef InputDictionary Inputs; 00420 typedef ResampleUnit<SampleType,InterpTypeCode> ResampleType; 00421 typedef InputTaskUnit<SampleType,Interp::Lagrange3> HQ; 00422 00423 static inline UnitInfos getInfo() throw() 00424 { 00425 const double blockSize = (double)BlockSize::getDefault().getValue(); 00426 const double sampleRate = SampleRate::getDefault().getValue(); 00427 00428 return UnitInfo ("InputTask", "Defer a unit's processing to a separate task, thread, process or core.", 00429 00430 // output 00431 ChannelCount::VariableChannelCount, 00432 IOKey::Generic, Measure::None, 0.0, IOLimit::None, 00433 IOKey::End, 00434 00435 // inputs 00436 IOKey::Generic, Measure::None, 00437 IOKey::BufferCount, Measure::Count, 16.0, IOLimit::Minimum, Measure::Count, 1.0, 00438 IOKey::Priority, Measure::Percent, 50.0, IOLimit::Clipped, Measure::Percent, 0.0, 100.0, 00439 IOKey::Multiply, Measure::Factor, 1.0, IOLimit::None, 00440 IOKey::Add, Measure::None, 0.0, IOLimit::None, 00441 IOKey::BlockSize, Measure::Samples, blockSize, IOLimit::Minimum, Measure::Samples, 1.0, 00442 IOKey::SampleRate, Measure::Hertz, sampleRate, IOLimit::Minimum, Measure::Hertz, 0.0, 00443 IOKey::End); 00444 } 00445 00446 static UnitType ar (UnitType const& input, 00447 const int numBuffers = 16, 00448 const int priority = 50, 00449 BlockSize const& preferredBlockSize = BlockSize::noPreference(), 00450 SampleRate const& preferredSampleRate = SampleRate::noPreference()) throw() 00451 { 00452 BlockSize blockSize = BlockSize::decide (input.getBlockSize (0), preferredBlockSize); 00453 SampleRate sampleRate = SampleRate::decide (input.getSampleRate (0), preferredSampleRate); 00454 00455 // could avoid the resample if we added the function the check if all bs/sr are the same in each channel 00456 00457 Inputs inputs; 00458 // inputs.put (IOKey::Generic, ResampleType::ar (input, 1, blockSize, sampleRate)); 00459 inputs.put (IOKey::Generic, input); 00460 00461 Data data = { { -1.0, -1.0 }, 0, numBuffers, priority, true }; 00462 00463 return UnitType::template proxiesFromInputs<TaskInternal> (inputs, 00464 data, 00465 blockSize, 00466 sampleRate); 00467 } 00468 00469 static UnitType arNoResample (UnitType const& input, 00470 const int numBuffers = 16, 00471 const int priority = 50, 00472 BlockSize const& preferredBlockSize = BlockSize::noPreference(), 00473 SampleRate const& preferredSampleRate = SampleRate::noPreference()) throw() 00474 { 00475 BlockSize blockSize = BlockSize::decide (input.getBlockSize (0), preferredBlockSize); 00476 SampleRate sampleRate = SampleRate::decide (input.getSampleRate (0), preferredSampleRate); 00477 00478 // could avoid the resample if we added the function the check if all bs/sr are the same in each channel 00479 00480 Inputs inputs; 00481 inputs.put (IOKey::Generic, input); 00482 00483 Data data = { { -1.0, -1.0 }, 0, numBuffers, priority, false }; 00484 00485 return UnitType::template proxiesFromInputs<TaskInternal> (inputs, 00486 data, 00487 blockSize, 00488 sampleRate); 00489 } 00490 00491 }; 00492 00493 typedef InputTaskUnit<PLONK_TYPE_DEFAULT> InputTask; 00494 00495 #endif // PLONK_THREADEDCHANNEL_H 00496