Challenge: Event-Driven Architecture
Extending the event-driven code example can provide you with a deeper understanding of the architecture, along with practical hands-on experience. As a starting point, take the event-driven architecture code example in the Git repository and build additional features. Here are some ideas, but feel free to come up with your own ideas and share them in the comments section:
- Dynamic Event Registration: Allow new event consumers to be registered at runtime without altering the core event dispatch code.
- Event Prioritization: Implement a mechanism to prioritize certain types of events over others, possibly using a priority queue for event handling.
- Event Filtering: Add the ability to filter events based on various criteria, such as event type, source, or payload content.
- Event Transformation: Implement middleware functions that can transform the event payload before it reaches the consumers.
- Error Handling and Retry: Add a robust error-handling mechanism for event processing failures. Implement retry logic for failed events.
- Time-based Events: Create a scheduling component that can trigger events based on time or after specific intervals.
- Distributed Events: Extend the example to a distributed system. This could be implemented using message queues like RabbitMQ or Kafka.
- Event Auditing: Implement an event logging or auditing system to keep track of all processed events.
- Stateful Events: Modify the system to maintain state between events. This could be useful in scenarios like multi-step workflows.
- Event Persistence: Integrate a database to store events, useful for system auditing, replaying events, or debugging.
- Event Versioning: Introduce a mechanism to handle different versions of an event, particularly useful for backward compatibility.
- Rate Limiting: Add the ability to rate-limit event producers or consumers to prevent abuse or system overload.
- Event Grouping: Allow for related events to be grouped together for batch processing.
- Monitoring and Metrics: Implement a monitoring system to keep track of events processed, errors encountered, and other relevant metrics.
- Real-time Notifications: Extend the system to provide real-time notifications to administrators or users based on specific events or conditions.
- Conditional Event Routing: Implement logic to route events to different consumers based on certain conditions.
- Webhook Support: Allow external services to be event consumers by supporting webhooks.
- Authentication and Authorization: Add security layers to control who can produce or consume events.
- Data Validation: Implement robust validation mechanisms for the incoming event data to ensure integrity.
I have a question regarding; 1. Dynamic Event Registration: Allow new event consumers to be registered at runtime without altering the core event dispatch code.
I'm not sure how to interpret this, does this mean the program is running using the;
await asyncio.gather(produce_event(queue), general_event_consumer(queue))
And we register a consumer afterwards and that should be picked-up automatically?
Or how should I interpret this?
Hi Joris, what I mean by this is that currently, the event consumers are registered in the main function and can't be changed once the program starts running. You could create an API code example, that has endpoints for registering and unregistering event consumers. That way, you can change what happens when an event occurs without having to restart the program.
I have a question about item number 6. Asynchronous Event Processing. Doesn't the example code already handle events asynchronously? Or am I missing something?
You're correct! I will correct that right away!
I think that it actually makes sense because in original implementation we consume each event sequentially. You can try to set `asyncio.sleep` of every concrete consumer to 5 seconds and run original code:
while True:
priority, event_type, event_data, event_payload = await queue.get()
consumer = registered_consumers.get(event_type)
if consumer:
await consumer(event_type, event_data)
You will see that there is a big delay between consumers responses.
However if we do it in this way:
async def general_event_consumer(queue: EventPriorityQueue):
tasks = []
while True:
priority, event_type, event_data, event_payload = await queue.get()
tasks.append(asyncio.create_task(process_event(queue, priority, event_type, event_data, event_payload)))
await asyncio.gather(*tasks)
async def process_event(queue, priority, event_type, event_data, event_payload):
try:
if random.randint(0, 10) > 7:
raise EventProcessingError
consumer = registered_consumers.get(event_type)
if consumer:
await consumer(event_type, event_data)
except EventProcessingError:
await handle_event_processing_error(queue, priority, event_type, event_payload)
Events will be processed both asynchronously and in parallel.
If I understood the task correctly, of course :)
Hi Stef and Oleksandr, indeed - though the original code does use async/await, it's mainly done to allow for the sleep functionality. What we mean with this particular challenge is the possibility to handle events concurrently, in other words, being able to indicate that some events can be handled concurrently with other events (using asyncio.gather). In that case, you also need a mechanism to define which events can be handled concurrently, and which events rely on other events to be handled first.
About Dynamic Event Registration
We are talking about existing consumer registration?
In a way that we add a new event type with a new consumer that will register an existing consumer (that was already part of the code base before run time - not dynamically lading dll's)?
Hopefully I'm getting it right.
Hi Gabriel, yes, I'm assuming that there is a definition of the event consumer in the code and that you don't load that dynamically. You could do that at a later stage using a plugin architecture. But that's another lesson :).
I have two questions for 1. Dynamic Event Registration:
1. Does "at runtime" mean right at the beginning of when the program is started, or at any point while the program is running?
2. Is the "core event dispatch code" the produce_event function?
Great questions!
1.
"at runtime" means any point while the program is running, not just at the beginning. In this context, it means that events can be registered, unregistered, or modified during the execution of the program.
2.
The term "core event dispatch code" generally refers to the part of an event handling system responsible for receiving events and dispatching them to the appropriate event handlers or listeners.
Thanks!
Follow up question for 1. the only way that is coming to mind for this is to make it an API and expose an endpoint for registering "new" event consumers. I have new in quotes because the consumer would have to be already defined in the code, and the API endpoint could just activate it. Otherwise I could adapt the endpoint to accept code to define event consumers and just run that, but that seems like a huge security risk.
Are there other alternatives to the API approach that I might be missing?
I am not sure I understand what you mean, can you elaborate?
Is it possible to provide some sample files extending these use-cases beyond the Python script you talked through the presentation, for example, how precisely would we set up a database that stores these events after checking they are genuine signals and how what to do in case of erroneous or faulty signals and what service best to use for the webhooks?
I noted there is a publication called "Architecture Patterns with Python: Enabling Test-Driven Development, Domain-Driven Design, and Event-Driven Microservices" by Percival and Gregory (2020) that elaborate on the event-driven pattern, but I am not sure whether it leads to a rabbit whole of jumping from resource to resource loosing focus on the actual task.
This is a great question. The way I see it is that it falls quite nicely with the Hexagonal Architecture. You could imagine creating an abstraction layer that supports pushing to a database within the event driven system itself, or (this is what we have implemented at my work), you have a special channel where all log events are sent, we had one consumer listening to it which would cache the log message. After some time or when the cache was full that consumer (which was a database related microservice) would push the log messages to the database. Note, we also had a force push option, so if a user wanted the latest log messages the service would be notified and it would push the cache to the database. Because the events where posted to the database you could easily read or set-up a notification channel (based on erroneous events) which filtered the log messages based on the log level.
This is just an idea, I do like your webhook question! I would like to know as well.
Hope this is somewhat helpful
That's a good idea. I'll think about adding a lesson covering this example in more detail!