Skip to main content
All workflows have access to the nodes topic exchanges. Each node can create one queue and access to this queue is controlled by who have write access to this node. Queues have a random number appended to them so that they will be unique per replica instance. They can all listen on the same topic and act as workers. Set up a worker like this:
from mirmod import miranda
from mirmod.mq import register_queue


@wob.init()
def init(self):
  self.value = None

@wob.transmitter("value", "output")
def transmit_value(self):
  return self.value


@wob.execute()
async def execute(self):
  ecx = miranda.get_execution_context()
  qh = await register_queue(ecx)
  print("Producers should use topic=", qh.topic)
  self.value = await qh.consume()
Running this node will give you a topic (the same as the metadata_id of the consumer node). register_queue(execution_context, ...) can take the following named parameters:
NameTypeDescription
namestringThe name of the queue. Usually not something you change because it is tied to how the application autenticates using node level privielges.
topicstringThe topic to bind to.
durablebooleanWill the queue survive a broker restart.
exclusivebooleanUsed by only one connection and the queue will be deleted when that connection closes
auto_deletebooleanQueue that has had at least one consumer is deleted when last consumer unsubscribes
message_ttlintHow long a messages can live if no consumer requests it.
max_lengthintMax length of a messages
Use this topic with the producer as follows:
from mirmod import miranda
from mirmod.mq import publish


@wob.init()
def init(self):
  self.value = None

@wob.transmitter("value", "output")
def transmit_value(self):
  return self.value


@wob.execute()
async def execute(self):
  topic = "42578" # The topic we got from the consumer node
  ecx = miranda.get_execution_context()
  await publish(ecx, message={"Hello": "world"}, topic=topic)
Start the consumer in a loop like this: tool_description.png The iterator can look something like this:
@wob.transmitter("value", "output", is_field=True)
def transmit_value(self):
  return self.itr


@wob.execute()
async def execute(self):
  class DummyItr:
    def __init__(self):
      pass

    def __iter__(self):
      return self

    def __next__(self):
      return ":)" # loop forever

  self.itr = DummyItr()