Events bus
The system relies on Kombu messaging library which is the underlying system to handle message passing in celery.
It aims to keep it simple, stupid by providing an event producer that will publish new message in events
or logs
queues and consumers that will call relevant tasks of type EventTask or LogTask accordingly.
Messages
Messages format are validated by relying on pydantic models defined in core.events.message.py
:
class MessageType(str, Enum):
log = "log"
event = "event"
class EventType(str, Enum):
new = "new"
update = "update"
delete = "delete"
class AbstractEvent(BaseModel, abc.ABC):
def match(self, acts_on: Pattern) -> bool:
raise NotImplementedError
class ObjectEvent(AbstractEvent):
type: EventType
yeti_object: YetiObjectTypes
[...]
class LinkEvent(AbstractEvent):
type: EventType
source_object: YetiObjectTypes
target_object: YetiObjectTypes
relationship: "graph.Relationship"
[...]
class TagEvent(AbstractEvent):
type: EventType
tagged_object: YetiObjectTypes
tag_object: "tag.Tag"
[...]
class AbstractMessage(BaseModel, abc.ABC):
type: MessageType
timestamp: datetime.datetime = Field(default_factory=now)
class LogMessage(AbstractMessage):
type: MessageType = MessageType.log
log: str | dict
EventTypes = Union[ObjectEvent, LinkEvent, TagEvent]
class EventMessage(AbstractMessage):
type: MessageType = MessageType.event
event: EventTypes
YetiObjectTypes
supports all defined schemas in Yeti (even private ones). For example, with ObjectEvent
message data, yeti_object
could be a campaign (entity), an ipv4 (observable), a regex (indicator), …
Producer
To publish an event message, producer must be called with publish_event
method with the event message argument that can be of type ObjectEvent
, LinkEvent
or TagLinkEvent
:
from core.events.producer import producer
producer.publish_event(event: EventTypes)
To publish a log message, producer must be called with publish_log
method which supports either str
or dict
as arguments:
from core.events.producer import producer
producer.publish_log(log: str | dict)
The call to publish method is non-blocking and just sends the message to the configured queue and exchange, events and logs respectively.
Queues management
When there’s no events or logs consumers running, Redis queue are growing infinitely and it will lead to OOM kill of the service.
Instead of relying on queue length, producer relies on memory usage of the queue. If the memory usage is greater than a configured threshold, the queue will be trimmed to keep the most recent keep_ratio
messages.
Memory limit
Memory limit is used as a threshold to trim the size of the message queue. It is configured with memory_limit
key under [events]
section. If not configured, it fallbacks to 64 MiB. If configured lower than 64MiB, it fallbacks to 64 MiB.
memory_limit
to 96.Keep ratio
When memory limit is reached, message queue is trimmed to remove oldest message. To define how much messages must be kept in the queue, keep_ratio
is used. It is defined as a float greater than 0 and lesser than 1. It is configured with keep_ratio
key under [events]
section. If not configured, it fallbacks to 0.9
meaning that 10% of the messages will be removed from the oldest messages in the queue. If configured keep_ratio
is lte 0 or keep_ratio
is gte 1, it fallbacks to 0.9
.
Consumers
In order to receive messages and process them, consumers must be created by defining from which queue they must receive the messages.
To handle events messages:
python -m core.events.consumers events
To handle logs messages:
python -m core.events.consumers logs
By default, the consumer will spawn several multiprocessing.Process
based on the number of available CPU by using multiprocessing.cpu_count()
.
If the number of spawned processes must be changed, --concurrency
argument can be used, followed by the number of processes to spawn.
Events Message routing
Plugins of type EventTask
can define acts_on
attribute which represents a regex to match an event. For every event message, EventWorker
calls match
method implemented by EventTypes
classes with acts_on
.
As example, the following acts_on
can be set to precisely define when a task must be called on events messages:
Global events -> ObjectEvent, LinkEvent, TagEvent
- called on all events:
""
- called on all
new
events:"new"
- called on all new or update events:
"(new|update)"
- called on all events:
Yeti objects specialisation -> ObjectEvent
- called on all events related to observables or entity objects:
"(new|update|delete):(observable|entity)"
- called on all events related to observables:
"(new|update|delete):observable"
- called on all events related to url:
"(new|update|delete):observable:url"
- called on all events related to ipv4 and ipv6:
"(new|update|delete):observable:(ipv4|ipv6)"
- called on all new campaign or vulnerability:
"new:entity:(campaign|vulnerability)"
- called on tag creation, update or deletion:
"(new|update|delete):tag"
- called on all events related to observables or entity objects:
Link events specialisation -> LinkEvent
- called on all events related to links:
"(new|update|delete):link"
- called on all events related to links having an observable as source:
"(new|update|delete):link:source:observable"
- called on all events related to links having an observable as target:
"(new|update|delete):link:target:observable"
- called on all events related to links:
Tagged event specialisation -> TagEvent
- called on all events related to tag links: “(new|update|delete):tagged”
- called on all events related when an object is tagged with malware or c2:
"(new|update|delete):tagged:(malware|c2)"