Events tasks

Events tasks

In order to create task that will be triggered based on events, the following example is provided:

from urllib.parse import urlparse

from core import taskmanager
from core.events.message import Message
from core.schemas import observable, task

class HostnameExtract(task.EventTask):
    _defaults = {
        "name": "HostnameExtact",
        "description": "Extract hostname (domain or ip) from new URL observable.",
        "acts_on": "(new|update):observable:url",
    }

    def run(self, message: EventMessage) -> None:
        url = message.event.yeti_object
        self.logger.info(f"Extracting hostname from: {url.value}")
        o = urlparse(url.value)
        if observable.IPv4.is_valid(o.hostname):
            extracted_obs = observable.IPv4(value=o.hostname).save()
        else:
            extracted_obs = observable.Hostname(value=o.hostname).save()
        url.link_to(extracted_obs, "hostname", "Extracted hostname from URL")
        return

taskmanager.TaskManager.register_task(HostnameExtract)

The task must inherit task.EventTask and define _defaults dictionary to define its name, description and the events to acts on.

When a consumer matches a task based on its acts_on, task run method will be called with the event as argument.

In the example, this task will always receive an EventMessage with event of type ObjectEvent because the consumer will precisely match on acts_on which is based on ObjectEvent.

When implementing a task with a more generic acts_on, the task is responsible for handling the different event types it can receive.

Testing

Producer / Consumer

To test the new event task, a new python shell is needed to execute the consumer:

poetry run python -m core.events.consumer events --debug

Then in another shell to trigger producer when saving an observable

>>> from core.schemas import observable
>>> observable.Url(value="http://example.com").save()