April 17, 2014
Hot Topics:
RSS RSS feed Download our iPhone app

For Brew Developers, There Are New Kids in Town: IThread & IRscPool, Part 2

  • November 6, 2003
  • By Radu Braniste
  • Send Email »
  • More Articles »

Abstract:

We will continue our journey, trying to avoid the limitations discovered in the last installment. We'll start with a short evaluation of the techniques used in the first article, hopefully allowing us to refine our initial approach. Some design variations will be presented too.

Known Facts

Using IThread and IRscPool from C is straightforward. Together with the ThreadUtil library, they offer a solid solution for a "task-oriented" style of programming in BREW. Using them from C++ is less obvious, and this because at least two reasons:

  1. IRscPool was intended as a "memory arena" with no knowledge about object construction and destruction (other than IBase-derived objects, but this is a BREW construct not very useful in C++). Using IRscPool implies using placement new and explicit destruction—an extremely inconvenient detail.
  2. IThread is poorly equipped for asynchronous cancellation—the stack is not unwound and this simply forbids the use of automatic objects with nontrivial destructors.

Let's start from the bottom up and try to figure out the minimal artifacts needed to solve or at least alleviate these problems.

Safe asynchronous cancellation implies that at any time the thread has to be prepared to respond to an interrupt signal by properly cleaning the resources, in a predictable and consistent manner, leaving the system in a stable state. BREW threads are cooperative—an additional bonus: An interrupt signal is guaranteed to be received in a stable state. A solution might be to intercept the interrupt call, do the necessary clean up on behalf of the thread, and pass control back to the caller. What's needed?

  1. A "god" instance having knowledge of the threads and announcing them when an interrupt occurs
  2. A mechanism to check for interrupts
  3. A blocking call to unwind the stack
  4. A way to correctly destruct objects involved in the execution of a particular thread

If comparing a., d., and Problem 1, we can easily see that they share a common request for a container with special management properties: Other than proper cleanup work, the container should intercept and dispatch the calls. Such a container solves Problem 1 and puts Problem 2 on a fast track.

We've already localized the need for such a container in a previous article (http://developer.com/ws/brew/article.php/1497121) and used a type-based materialization of the concept in "A Generic Connection Framework for BREW" (http://www.developer.com/ws/brew/article.php/2242101).

This time, we will use a polymorphic container (ResourceRegistry), manipulating IRelease interfaces. This choice is deliberate, mimicking the IRscPool/IBase pair.

Here are the public interfaces:

class IRelease
{
public:
  virtual void releaseResource() = 0;
  virtual int getType() const    = 0;
  virtual int getID() const      = 0;
  virtual void notify()          = 0;
  virtual void interrupt()       = 0;
};

class ResourceRegistry
{
public:
  int registerResource(IRelease* resource);
  bool unregisterResource(int uid);
  IRelease* getRegistered(int uid) const;
  typedef void (IRelease::*FUNC)();
  int notifyAll(void (IRelease::*f)(), int type = -1) const;
  int notify(void (IRelease::*f)(), int id) const;
  int interruptAll(int type = -1) const;
  int interrupt(int id) const;
  ~ResourceRegistry() ;
  ResourceRegistry();
};

IRelease defines a class of interruptible resources, capable of sending notifications and identifiable by ID and type. ResourceRegistry stores and manipulates IRelease-based resources, closely following IRelease exposed functionality. We will go into implementation details a little bit later.

In the previous article, IThread was packaged in a wrapper—BThread—and the advantages of such an approach were fully explained. This time, as promised, BThread (renamed BThreadT) will have the benefit of more functionality, resembling Java Thread implementation. This decision is somehow arbitrary but not without merits: It keeps the implementation simple and focused on the virtues of cooperative multithreading (for example mutual exclusion is naturally achieved in such "serialized" systems—there is rarely any need for a "synchronized" keyword or a mutex). So, for example, we will find, wait, and notify.

BThreadT implements IRelease—meaning, among other things already discussed, that it is interruptible and notifiable. It exposes the following public functionality:

template <class R, class RR = ResourceRegistry,
          class P = BThreadPolicy>
class BThreadPP : public IRelease
{
public:
  static BThreadPP* createThread(IShell* shell, RR *rsc,
                                 R* runnable  = 0);
  int start();
  int sleep(int ms);
  int wait();
  void notifyAll();
  void notify(int id);
  int yield();
  template <class T>
  static void join(T* t, void* v, PFNNOTIFY fx);
  void setCbk(void* v, PFNNOTIFY fx); 
  IThread* getNativeThread();
//IRelease
  virtual void releaseResource();
  virtual int getType() ;
  virtual int getID() ;
  virtual void notify();
  virtual void interrupt();
}

Every BThreadT has an associated "runnable"—the mysterious R in the template list. A runnable is basically a functor implementing the thread's loop. An alternative design, based on dynamic polymorphism, is also possible: "Runnable" R is a descendent of an IRunnable in this case. Other than a slight performance penalty affecting the dynamic version (static vs. dynamic, functor vs. virtual callback) and the type coupling that might be counterproductive in some usage scenarios, the two designs are fully equivalent.

Internally, BThreadT hosts an interesting method: checkInterrupt(). Almost all the public methods do a check of the interrupted status (isInterrupted_) (whose change might be trigger by interrupt()) via checkInterrupt. If isInterrupted is true, an EINTR error is raised. That's why all the methods usually returning void return an int in this implementation and their return code should be checked for EINTR.

Let's take a closer look at this mechanism:

void operator()(BThreadPP<RunnableImplSimple>* t)
  {
    Writer writer(shell_);
    while (cursor_ < 3000)
    {
      String s = String((long)cursor_);
      writer.DisplayOutput(1,s.toCharArray());
      if (cursor_ % 10 == 0) 
      {
        if (t->yield() == EINTR)
          break;
      }
      ++cursor_;
    }
  }

This is an example of a very simple function to be executed by a BThreadT. Please note the automatic objects created on the stack—potential problems for asynchronous cancellations. When user exits the application (unexpectedly or not), the ResourceRegistry is destroyed, triggering a global interrupt. The main advantage of ResourceRegistry over a pool is that in a Registry resources are kept as individual, enumerable instances—and this is how interrupts are propagated to the active threads. Interestingly enough, an interrupt has to be synchronous (we are in the middle of a destructor activity, remember?). That's why we have the following implementation:

virtual void interrupt()
  {
    if (pcb_)
    {
      isInterrupted _ = true;
      pcb_->pfnNotify(pcb_->pNotifyData );
      releaseCbk();
    }
    else
    {
      isInterrupted _ = false;
    }
  }

pcb_ is the resuming callback obtained usually as a result of:

pcb_ = ITHREAD_GetResumeCBK(thread_);

Now, if there is a callback pending, the suspended call is resumed:

pcb_->pfnNotify(pcb_->pNotifyData );

and isInterrupted is set to true. When yield() is checked against EINTR, actually it verifies whether or not isInterrupted was set to true. If this is true, the code exits normally via a break, the stack is unwound correctly, and the objects are properly destructed.

Other implementation details:

Threads are created and started in two distinct steps:

  typedef BThreadPP<RunnableImplSimple> BThrS;
  BThrS *t = BThrS::createThread(m_pIShell,rr_);
  int err = t->start();

A thread is identifiable by the associated ResourceRegistry id (basically, the thread position in the Registry container). There is a getIdD() method capable of returning the id of a thread after its creation.

All the operations on threads can be executed at a global or an instance level. Instance level is by id as described before. At the global level resources, can be grouped by type—analogous to getId() there is a getType(); For example threads have (an arbitrary) type 100. That's why all xxxAll (unregisterAll() is a notable exception) methods accept an additional type parameter.

"join" was implemented as a static member function and a specialized W_AEECBK structure wraps its callbacks in a RAII fashion. It accepts static PFNNOTIFY functions only. Passing member functions as entry points is not trivial, due to the fact that we have to make known the class type to BThreadT. A possible implementation can be found in BThreadT.h:

template <class Y, class T>
struct pkAEECBK

Other similar strategies can be used—one, accepting functions with one or more parameters, being presented for example in [1].

Using BThreadT:

Creating and starting a thread is simple:

typedef BThreadPP<RunnableImplSimple> BThrS;
  BThrS *t = BThrS::createThread(m_pIShell,rr_);
  int err = t->start();

where:

struct RunnableImplSimple
{
  RunnableImplSimple(IShell* shell) : shell_(shell), cursor_(0)
  {
  }
  void operator()(BThreadPP<RunnableImplSimple>* t)
  {
    Writer writer(shell_);
    while (cursor_ < 3000)
    {
      String s = String((long)cursor_);
      writer.DisplayOutput(1,s.toCharArray());
      if (cursor_ % 10 == 0) 
      {
        if (t->yield() == EINTR)
          break;
      }
      ++cursor_;
    }
  }
private:
  IShell* shell_;
  int cursor_;
};

The above code simply iterates a cursor, yielding every 10 counts. yield() is a mandatory action—is how BREW regains control and "pets" the watchdog. As described before, we check explicitly for interrupts (t->yield() == EINTR).

More complicated cases are provided in the tests, for example:

typedef BThreadPP<RunnableImplPP> BThrPP;
  typedef BThreadPP<RunnableImplPP1> BThrPP1;
  BThrPP *t = BThrPP::createThread(m_pIShell,rr_);
  int err = t->start();
  RunnableImplPP1* rim = new RunnableImplPP1(m_pIShell,t);
  BThrPP1 *t1 = BThrPP1::createThread(m_pIShell,rr_, rim);
  err = t1->start();

void operator()(BThreadPP<RunnableImplPP>* t)
  {
    Writer writer(shell_);
    while (cursor_ < 300)
    {
      String s = String((long)cursor_);
      writer.DisplayOutput(1,s.toCharArray());
      if (t->sleep(10) == EINTR)
        break;
      if (cursor_ == 30)
      {
        if (t->wait() == EINTR)
          break;
      }
      ++cursor_;
    }
    t->notifyAll();
  }

void operator()(BThreadPP<RunnableImplPP1>* t)
  {
    Writer writer(shell_);
    while (cursor_ > 0)
    {
      String s= String((long)cursor_);
      writer.DisplayOutput(4,s.toCharArray());
      if (t->sleep(15) == EINTR)
        break;
      if (tt_ && (cursor_ == 200) )
      {
        t->notify(tt_->getID());
      }
      if ((cursor_ == 100)&& (tt_) )
      {
        if (t->wait() == EINTR)
          break;
      }
      --cursor_;
    }
  }

This example starts two threads. The first one waits after 30 cycles, being explicitly notified by the second after 100 cycles. After that, the second one uses a wait/notifyAll mechanism to join the first one.

Future Enhancements

The current implementation is far from exhibiting the API of a normal thread. Interested users can easily adapt additional ThreadUtil functionality to be used by BThreadT or can create their own. For example, a Mutex (if ever needed!) might be easily implemented by using ResourceRegistry for cancellation.

Accompanying Source Code

CPPApp.zip

End Notes

1. Wolfgang Bangerth: "Starting Threads in a C++ Compatible Fashion," C/C++ Users Journal, October 2003.

About the Author

Radu Braniste is Director of Technology at Epicad. He can be contacted at rbraniste@epicad.com

# # #






Comment and Contribute

 


(Maximum characters: 1200). You have characters left.

 

 


Sitemap | Contact Us

Rocket Fuel