Documentation Index
Fetch the complete documentation index at: https://docs.mainly.ai/llms.txt
Use this file to discover all available pages before exploring further.
All workflows have access to the nodes topic exchanges. Each node can create one queue and access to this queue is controlled by who have write access to this node. Queues have a random number appended to them so that they will be unique per replica instance. They can all listen on the same topic and act as workers.
Set up a worker like this:
from mirmod import miranda
from mirmod.mq import register_queue
@wob.init()
def init(self):
self.value = None
@wob.transmitter("value", "output")
def transmit_value(self):
return self.value
@wob.execute()
async def execute(self):
ecx = miranda.get_execution_context()
qh = await register_queue(ecx)
print("Producers should use topic=", qh.topic)
self.value = await qh.consume()
Running this node will give you a topic (the same as the metadata_id of the consumer node).
register_queue(execution_context, ...) can take the following named parameters:
| Name | Type | Description |
|---|
| name | string | The name of the queue. Usually not something you change because it is tied to how the application autenticates using node level privielges. |
| topic | string | The topic to bind to. |
| durable | boolean | Will the queue survive a broker restart. |
| exclusive | boolean | Used by only one connection and the queue will be deleted when that connection closes |
| auto_delete | boolean | Queue that has had at least one consumer is deleted when last consumer unsubscribes |
| message_ttl | int | How long a messages can live if no consumer requests it. |
| max_length | int | Max length of a messages |
Use this topic with the producer as follows:
from mirmod import miranda
from mirmod.mq import publish
@wob.init()
def init(self):
self.value = None
@wob.transmitter("value", "output")
def transmit_value(self):
return self.value
@wob.execute()
async def execute(self):
topic = "42578" # The topic we got from the consumer node
ecx = miranda.get_execution_context()
await publish(ecx, message={"Hello": "world"}, topic=topic)
Start the consumer in a loop like this:
The iterator can look something like this:
@wob.transmitter("value", "output", is_field=True)
def transmit_value(self):
return self.itr
@wob.execute()
async def execute(self):
class DummyItr:
def __init__(self):
pass
def __iter__(self):
return self
def __next__(self):
return ":)" # loop forever
self.itr = DummyItr()