ConcurrentLua logo

ConcurrentLua

Concurrency Oriented Programming in Lua

Introduction

ConcurrentLua is an implementation of the share-nothing asynchronous message-passing model that is employed in the Erlang programming language. It is an adaptation of the Erlang concurrency primitives in such a way as to integrate strongly to Lua.

One of the core elements of ConcurrentLua is the process. A process is a light-weight VM thread, that plays the same role as do processes in an operating system; they don't share memory but instead they communicate through some kind of interprocess communication. These processes can be created and destroyed on demand, and a simple round-robin scheduler passes control to them.

Each process is associated with a mailbox, a message queue for the temporary storage of messages that were sent to the process. A process can check its mailbox for new messages at any time, and if there are any, they can be read in the order of arrival.

Each process is identified by a unique numeric process identifier, or else PID. In addition, aliases or process names can be used instead of PIDs, in order to refer to processes. These aliases and their references are stored in a central repository, the registry. Processes can edit the registry, by adding or deleting entries.

Error handling mechanisms are also provided in the form of monitors and links. With monitors processes can monitor the other processes, and get notified if the monitored processes terminate abnormally. With links processes are bound together, and when one of them terminates abnormally the other one is signalled and terminates, too.

This system also supports distributed programming and all the properties that have been described map naturally onto a distributed system. Distributed processes communicate with the same primitives as local processes.

Distribution is based on a component that is called the node. A node represents a system runtime inside of which processes are executing. Nodes can be connected to each other and communicate, thus forming a virtual network. Distributed processes use this network in order to exchange messages.

Each node has a name associated with it. In order for other nodes to connect to each other by using only this name, a port mapper daemon acts a nameserver. The port mapper daemon has details about the nodes running under the network host that the daemon itself is bound to.

As processes can be created locally, it is also possible to request the creation of processes on remote nodes. A remote process can then be handled as if it was a local process.

If the nodes that form the virtual network are fully connected (every node is connected bidirectionally to each other), global aliases can be used for the processes. The nodes negotiate and maintain a virtual global registry and also keep updated local copies of the registry.

Monitors and links for distributed processes are supported with the same semantics as for local processes. Nodes take care of the task of transparently handling errors between distributed processes. In addition, it is possible for processes to monitor nodes as a whole.

Nodes are required to authenticate before they can communicate. An authenticated node can then be part of the virtual network that the nodes form. A simple security mechanism takes care of this task.

Implementation

The implementation of ConcurrentLua is based on the Lua component system. The system is organized as a collection of Lua modules and submodules. The main modules are two, and provide the concurrent and distributed programming functionality respectively. One could load only the concurrency module and also for each module there is the option of not loading some of the submodules if the functionality they provide is not needed. A stand-alone port mapper daemon utility is also included.

The processes in the system are implemented with Lua coroutines. A process is actually a Lua coroutine that yields control when the process suspends its execution and resumes control when the process continues its execution.

The scheduling of the processes is still based on the cooperative multithreading model that Lua uses. Processes voluntarily suspend their execution and thus other processes get the chance to run. Nevertheless, the suspending and resuming of processes is partly hidden under a higher level mechanism; a process suspends its execution when waiting for a message to arrive and becomes ready to be resumed when new messages have arrived in its mailbox. A simple round-robin scheduler resumes the processes.

Any type of Lua data, with the exception of memory references, can be sent inside messages. Messages can be booleans, numbers, strings, tables or functions, and any combination of them. Data are automatically serialized on sent and deserialized on receive, and everything is passed by value.

Interprocess communication between nodes, and subsequently between distributed processes, is based on an asynchronous socket handler. This translates to networking model that uses non-blocking sockets and periodic polling. This is the approach that is mostly used today by Lua libraries. Non-blocking semantics should be also used for IO such as files, pipes, etc.

Usage

Some examples will provide an introduction to the most essential properties of the system, from process creation and message passing to distributed programming and error handling.

Creating processes

Processes are created using the spawn() function. The spawn() function takes at least one argument; the function that contains the command set that the process will execute. Any additional arguments are passed directly as arguments of the function.

The following example demonstrates the creation of a process. The process just prints a message as many times as specified:

require 'concurrent'

function hello_world(times)
    for i = 1, times do print('hello world') end
    print('done')
end

concurrent.spawn(hello_world, 3)

concurrent.loop()

The output would be:

hello world
hello world
hello world
done

First the system is loaded:

require 'concurrent'

The function that the process will execute is defined next:

function hello_world(times)
    for i = 1, times do print('hello world') end
    print('done')
end

A new process is created:

concurrent.spawn(hello_world, 3)

The system's infinite loop is called last:

concurrent.loop()

Exchanging messages

Processes can exchange messages by using the send() and receive() functions. Also, the self() function can be used to get the PID of the calling process.

The following program implements two processes that exchange messages and then terminate:

require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n, pid)
    for i = 1, n do
        concurrent.send(pid, {
            from = concurrent.self(),
            body = 'ping'
        })
        local msg = concurrent.receive()
        if msg.body == 'pong' then
            print('ping received pong')
        end
    end
    concurrent.send(pid, {
        from = concurrent.self(),
        body = 'finished'
    })
    print('ping finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished

After the pong process is created, the ping process is supplied with the PID of the pong process:

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

The ping process sends a message:

concurrent.send(pid, {
    from = concurrent.self(),
    body = 'ping'
})

The pong process waits for a message to arrive and saves it in a variable when it does:

local msg = concurrent.receive()

The pong process replies:

  concurrent.send(msg.from, { body = 'pong' })

The pong process terminates after having received a notification from the ping process.

Registering process names

Instead of using process PIDs for sending messages, process names can also be used. The register() function can be used to create an alias for a process in the registry:

require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n)
    for i = 1, n do
        concurrent.send('pong', {
            from = concurrent.self(),
            body = 'ping'
        })
        local msg = concurrent.receive()
        if msg.body == 'pong' then
            print('ping received pong')
        end
    end
    concurrent.send('pong', {
        from = concurrent.self(),
        body = 'finished'
    })
    print('ping finished')
end

pid = concurrent.spawn(pong)
concurrent.register('pong', pid)
concurrent.spawn(ping, 3)

concurrent.loop()

The only change from the previous example is the destination that the ping process sends messages to:

concurrent.send('pong', {
    from = concurrent.self(),
    body = 'ping'
})

And:

concurrent.send('pong', {
    from = concurrent.self(),
    body = 'finished'
})

And the pong process now registers its name:

concurrent.register('pong', pid)

Therefore the ping process isn't supplied with the PID of the pong process.

Distributed message passing

Processes in different nodes can still communicate with the same message passing primitives. Remote processes are denoted by their PID or alias and the node they are executing under. The previous example could be broken into two programs, one for each process.

The code for the pong process:

require 'concurrent'

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'finished' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

concurrent.init('pong@gaia')

pid = concurrent.spawn(pong)

concurrent.register('pong', pid)
concurrent.loop()
concurrent.shutdown()

And the code for the ping process:

require 'concurrent'

function ping(n)
    for i = 1, n do
        concurrent.send({ 'pong', 'pong@gaia' }, {
            from = { concurrent.self(), concurrent.node() },
            body = 'ping'
        })
        local msg = concurrent.receive()
        if msg.body == 'pong' then
            print('ping received pong')
        end
    end
    concurrent.send({ 'pong', 'pong@gaia' }, {
        from = { concurrent.self(), concurrent.node() },
        body = 'finished'
    })
    print('ping finished')
end

concurrent.spawn(ping, 3)

concurrent.init('ping@selene')
concurrent.loop()
concurrent.shutdown()

The output of the pong process would be:

pong received ping
pong received ping
pong received ping
pong finished

And the output of the ping process would be:

ping received pong
ping received pong
ping received pong
ping finished

In this example the runtime system is running in distributed mode. In order for this to happen, first the port mapper daemon has to be started. This can done by typing in a command line shell:

$ clpmd

The code that initializes the node that the pong process is running on:

concurrent.init('pong@gaia')

And the code for the ping process:

concurrent.init('ping@selene')

The previous two code snippets register to the port mapper daemon, the port that each node is listening to. Both nodes unregister their port with:

concurrent.shutdown()

The only other changes in this example are the destination that the messages are sent to, along with the introduction of the node() function that returns the name of the node that the calling process is running on:

concurrent.send({ 'pong', 'pong@gaia' }, {
    from = { concurrent.self(), concurrent.node() },
    body = 'ping'
})

And later:

concurrent.send({ 'pong', 'pong@gaia' }, {
    from = { concurrent.self(), concurrent.node() },
    body = 'finished'
})

Handling error

One approach to handle errors in processes is the notion of linked processes. Two processes are bound together and if one of them terminates abnormally the other one terminates, too. The link() function can be used to link processes:

require 'concurrent'

function ping(n, pid)
    concurrent.link(pid)
    for i = 1, n do
        concurrent.send(pid, {
            from = concurrent.self(),
            body = 'ping'
        })
        local msg = concurrent.receive()
        if msg.body == 'pong' then
            print('ping received pong')
        end
    end
    print('ping finished')
    concurrent.exit('finished')
end

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished

The pong process never reaches its last line, because it terminates when the ping process exits.

The code that links the processes is:

concurrent.link(pid)

The exit() function is used to make the calling function quit abnormally:

concurrent.exit('finished')

It is also possible to trap the exit signal of the terminating process. In this case a special message is received:

require 'concurrent'

concurrent.setoption('trapexit', true)

function pong()
    while true do
        local msg = concurrent.receive()
        if msg.signal == 'EXIT' then
            break
        elseif msg.body == 'ping' then
            print('pong received ping')
            concurrent.send(msg.from, { body = 'pong' })
        end
    end
    print('pong finished')
end

function ping(n, pid)
    concurrent.link(pid)
    for i = 1, n do
        concurrent.send(pid, {
            from = concurrent.self(),
            body = 'ping'
        })
        local msg = concurrent.receive()
        if msg.body == 'pong' then
            print('ping received pong')
        end
    end
    print('ping finished')
    concurrent.exit('finished')
end

pid = concurrent.spawn(pong)
concurrent.spawn(ping, 3, pid)

concurrent.loop()

The output would be:

pong received ping
ping received pong
pong received ping
ping received pong
pong received ping
ping received pong
pong finished
ping finished

There is an option related to process linking that can be set with the setoption() function, specifically the trapexit option:

concurrent.setoption('trapexit', true)

Then the pong process receives a special exit message:

if msg.signal == 'EXIT' then
    break

Alternatively, monitors that are based on notification messages, can be also used for error handling.