Monday, January 30, 2017

D - Concurrency

Concurrency is making a program run on more than one thread at a time. An example of a concurrent program would be a web server responding more than one client at the same time. Concurrency is easy with message passing but very difficult to write if they are based on data sharing.

Data that is passed between threads are called messages. Messages may be composed of any type and any number of variables. Every thread has an id, which is used for specifying recipients of messages. Any thread that starts another thread is called the owner of the new thread. Any thread that starts another thread is called the owner of the new thread.

Initiating threads

spawn() takes a function pointer as a parameter and starts a new thread from that function. Any operations that are carried out by that function, including other functions that it may call, would be executed on the new thread. The owner and the worker start executing separately as if they were independent programs:
import std.stdio;
import std.stdio;
import std.concurrency;
import core.thread;

void worker(int a)
{
   foreach (i; 0 .. 4)
   {
      Thread.sleep(1);
      writeln("Worker Thread ",a + i);
   }
}

void main()
{
   foreach (i; 1 .. 4)
   {
      Thread.sleep(2);
      writeln("Main Thread ",i);
      spawn(&worker, i * 5);
    }

    writeln("main is done.");

}
When the above code is compiled and executed, it reads the file created in previous section and produces the following result:
Main Thread 1
Worker Thread 5
Main Thread 2
Worker Thread 6
Worker Thread 10
Main Thread 3
main is done.
Worker Thread 7
Worker Thread 11
Worker Thread 15
Worker Thread 8
Worker Thread 12
Worker Thread 16
Worker Thread 13
Worker Thread 17
Worker Thread 18

Thread identifiers

The thisTid variable that is available globally at the module level is always the id of the current thread. Also you can receive the threadId when spawn is called. An example is shown below.
import std.stdio;
import std.concurrency;

void printTid(string tag)
{
   writefln("%s: %s, address: %s", tag, thisTid, &thisTid);
}

void worker()
{
   printTid("Worker");
}

void main()
{
   Tid myWorker = spawn(&worker);

   printTid("Owner ");

   writeln(myWorker);
}
When the above code is compiled and executed, it reads the file created in previous section and produces the following result:
Owner : Tid(std.concurrency.MessageBox), address: 10C71A59C
Worker: Tid(std.concurrency.MessageBox), address: 10C71A59C
Tid(std.concurrency.MessageBox)

Message Passing

send() sends messages and receiveOnly() waits for a message of a particular type. There is also prioritySend(), receive(), and receiveTimeout(), which will be explained later below.The owner in the following program sends its worker a message of type int and waits for a message from the worker of type double. The threads continue sending messages back and forth until the owner sends a negative int. An example is shown below.
import std.stdio;
import std.concurrency;
import core.thread;
import std.conv;

void workerFunc(Tid tid)
{
   int value = 0;

   while (value >= 0) 
   {
      value = receiveOnly!int();
      auto result = to!double(value) * 5;
      tid.send(result);
   } 

}

void main()
{
   Tid worker = spawn(&workerFunc,thisTid);
   
   foreach (value; 5 .. 10) {
      worker.send(value);
      auto result = receiveOnly!double();
      writefln("sent: %s, received: %s", value, result);
   }   
 
   worker.send(-1);
}
When the above code is compiled and executed, it reads the file created in previous section and produces the following result:
sent: 5, received: 25
sent: 6, received: 30
sent: 7, received: 35
sent: 8, received: 40
sent: 9, received: 45

Message Passing with wait

A simple example with the message passing with wait is shown below.
import std.stdio;
import std.concurrency;
import core.thread;
import std.conv;

void workerFunc(Tid tid)
{
   Thread.sleep(dur!("msecs")( 500 ),);
   tid.send("hello");
}

void main()
{
   spawn(&workerFunc,thisTid);

   writeln("Waiting for a message");

   bool received = false;

   while (!received) 
   {
      received = receiveTimeout(dur!("msecs")( 100 ),
      (string message){
         writeln("received: ", message);
      });

      if (!received) {
         writeln("... no message yet");
      } 
   }
}
When the above code is compiled and executed, it reads the file created in previous section and produces the following result:
Waiting for a message
... no message yet
... no message yet
... no message yet
... no message yet
received: hello

No comments:

Post a Comment