operator
operator
__all__ = ['BaseHttpOperator', 'BaseFileOperator', 'BaseEventOperator', 'DelayOperator', 'ErrorReprocessOperator', 'RedirectOperator']
module-attribute
BaseEventOperator
Bases: BaseOperator
BaseEventOperator class to handle event trigger operations.
This class extends BaseOperator for operations triggered by events.
__init__()
Initializes the BaseEventOperator class.
Sets the trigger type to 'event' and initializes necessary attributes.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str)
Executes event processing logic.
This method needs to be implemented in subclasses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data associated with the event. |
required |
topic
|
str
|
The event topic. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
This method should be implemented by subclasses. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
BaseFileOperator
Bases: BaseOperator
BaseFileOperator class to handle file trigger operations.
This class extends BaseOperator for operations triggered by files.
__init__()
Initializes the BaseFileOperator class.
Sets the trigger type to 'file' and initializes necessary attributes.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to file operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(bucket: str, filepath: str)
Executes file processing logic.
This method needs to be implemented in subclasses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the bucket where the file is located. |
required |
filepath
|
str
|
The path to the file within the bucket. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
This method should be implemented by subclasses. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes file logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the file. |
required |
BaseHttpOperator
Bases: BaseOperator
BaseHttpOperator class to handle HTTP trigger operations.
This class extends BaseOperator for operations triggered by HTTP requests.
__init__()
Initializes the BaseHttpOperator class.
Sets the trigger type to 'http' and initializes necessary attributes.
build_error_message(message: str, request) -> dict
Builds an error message specific to HTTP operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
request
|
Request
|
The HTTP request object. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(request)
Executes HTTP request processing logic.
This method needs to be implemented in subclasses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request
|
Request
|
The HTTP request object. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
This method should be implemented by subclasses. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(request) -> None
Processes the incoming HTTP request and executes processing logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request
|
Request
|
The HTTP request object. |
required |
DelayOperator
Bases: BaseEventOperator
Introduces a delay in the processing pipeline.
This operator sleeps for a specified amount of time in seconds. The maximum delay is capped at 500 seconds.
__init__()
Initializes the DelayOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str) -> None
Executes the delay operation.
The function sleeps for the number of seconds specified, capping the maximum wait time at 500 seconds.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing a key 'seconds' which determines how many seconds the operator should wait. |
required |
topic
|
str
|
The topic to which the event is associated. This parameter is not utilized in this operator. |
required |
Returns:
Type | Description |
---|---|
None
|
None. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
ErrorReprocessOperator
Bases: BaseEventOperator
Operator to handle processing of erroneous events.
This operator manages the retry logic for events that fail. It can reprocess events based on configured retries and intervals, and if the maximum retries are exceeded, it saves the error details to the datalake
__init__()
Initializes the ErrorReprocessOperator.
Inherits from the BaseEventOperator and performs any necessary initialization.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the error processing logic for the given data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The event data that needs to be processed. |
required |
topic
|
str
|
The topic from which the event is received. |
required |
Raises:
Type | Description |
---|---|
KeyError
|
If required keys are missing in the data dictionary. |
This method retrieves necessary metadata from the input data, handles retries based on the specified parameters, and publishes either the retried event back to the original topic or saves the error details to the datalake if maximum retries have been exceeded.
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
RedirectOperator
Bases: BaseEventOperator
Operator that receives one event from a queue topic and publishes multiple messages to another topic.
This operator takes a dictionary of event data and publishes messages to a specified topic.
__init__()
Initializes the RedirectOperator.
add_key(obj: dict, keys: list, value: Any) -> dict
Adds a value to a nested dictionary at a specified key path.
This method takes an object (dictionary), a list of keys representing a path to a location in that dictionary, and a value to insert at that location. If the specified nested keys do not exist in the dictionary, they will be created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj
|
dict
|
The dictionary to which the keys and value will be added. |
required |
keys
|
list
|
A list of keys representing the path in the dictionary. |
required |
value
|
Any
|
The value to be set at the specified key path. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A new dictionary with the value added at the specified key path. |
Example
obj = {'a': {'b': 1}} keys = ['a', 'c'] value = 2 result = add_key(self, obj, keys, value)
result will be {'a': {'b': 1, 'c': 2}}
add_param_to_message(message: dict, param: dict) -> list
Adds a parameter's values to a single message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
dict
|
The original message to which parameters will be added. |
required |
param
|
dict
|
A dictionary containing a 'key' and a list of 'values'. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of new messages with the parameter added. |
add_param_to_messages(messages: list, param: dict) -> list
Adds a single parameter to each message in a list of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to modify. |
required |
param
|
dict
|
A dictionary containing a 'key' and 'values'. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of modified messages with the parameter added. |
add_params_to_messages(messages: list, params: list) -> list
Adds parameters to each message in a list of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to be modified. |
required |
params
|
list of dict
|
A list of parameter dictionaries, each containing a key and a list of values. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of messages with the parameters added. |
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str) -> None
Executes the operator, publishing messages to a specified topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing event data with keys: - project (str): The project where the destination queue is hosted. - topic (str): The queue topic to which the newly generated messages will be published. - messages (list): A list of initial messages to publish. - params (list of dict): A list of parameters to modify the messages. |
required |
topic
|
str
|
The topic to publish the messages to. |
required |
Returns:
Type | Description |
---|---|
None
|
None |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |