Service API

import lymph


class Echo(lymph.Interface):

    @lymph.rpc()
    def echo(self, text=None):
        return text

    @lymph.rpc()
    def upper(self, text=None):
        self.emit('uppercase_transform_finished', {'text': text})
        return text.upper()

    @lymph.event('uppercase_transform_finished')
    def on_uppercase(self, text=None):
        print "done", text
class lymph.Interface
name

The interface identifier that is used to register this service with the coordinator service. name is an instance attribute which is taken from the config of the interface taken from the config of the interface.

on_start()

Called when the service is started

on_stop()

Called when the service is stopped

apply_config(config)
Parameters:config – dict

Called with instance specific configuration that is usually provided by a config file (see Metrics Configuration).

request(address, method, body)
Parameters:
  • address – the address where the request is sent to; either a ZeroMQ endpoint or a service name
  • method – the remote method that will be called
  • body – JSON serializable dict of parameters for the remote method
proxy(address)

returns a proxy object that can be used to conveniently send requests to another service.

echo = self.proxy('echo')
result = echo.upper(text='foo')
assert result == 'FOO'

This is equivalent to self.request('echo', 'echo.upper', text='foo').

emit(event_type, payload, delay=0)
Parameters:
  • event_type – str
  • payload – a dict of JSON serializable data structures
  • delay – delay delivery of this event by delay seconds
@subscribe(*event_types, sequential=True)

Behaves like lymph.event(), but can be used at runtime

class Example(lymph.Service):
    def on_start(self):
        @self.subscribe('dynamic_event_type')
        def on_event(event):
            assert isinstance(event, lymph.core.events.Event)
@lymph.raw_rpc

Marks the decorated interface method as an RPC method. Using this decorator the RPC function are expected to accept a ReplyChannel instance as a first argument.

import lymph

class Example(lymph.Interface):
    @lymph.raw_rpc()
    def do_something(self, channel, message):
        assert isinstance(channel, lymph.core.channels.ReplyChannel)
        assert isinstance(message, lymph.core.messages.Message)
        channel.ack()
@lymph.rpc

Marks the decorated interface method as an RPC method. The difference between this decorator and raw_rpc() is that the RPC functions must use return and raise like any normal Python function instead of using channel.reply and channel.error.

Parameters:raises – tuple of exception classes that the RPC function is expected to raise.
import lymph

class Example(lymph.Interface):
    @lymph.rpc()
    def do_something(self, message):
        return message
@lymph.event(*event_types, sequential=False)
Parameters:
  • event_types – may contain wildcards, e.g. 'subject.*'
  • sequential – force sequential event consumption
  • broadcast – receive every event in all instances

Marks the decorated interface method as an event handler. The service container will automatically subscribe to given event_types. If sequential=True, events will be not be consumed in parallel, but one by one. If broadcast=True, every instance of the service will receive the event.

import lymph

class Example(lymph.Interface):
    @lymph.event('task_done')
    def on_task_done(self, event):
        assert isinstance(event, lymph.core.events.Event)
@lymph.task
Parameters:
  • sequential – force sequential task execution per instance
  • broadcast – execute the task in all instances

Marks the decorated interface method as a task handler.