operator
operator
__all__ = ['GoogleBaseEventOperator', 'GoogleBaseFileOperator', 'GoogleDelayOperator', 'GoogleRedirectOperator']
module-attribute
GoogleBaseEventOperator
Bases: BaseEventOperator
Base operator for event operations in Google Cloud.
__init__() -> None
Initializes the GoogleBaseEventOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str)
Executes event processing logic.
This method needs to be implemented in subclasses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The data associated with the event. |
required |
topic
|
str
|
The event topic. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
This method should be implemented by subclasses. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
GoogleBaseFileOperator
Bases: BaseFileOperator
Base operator for file operations in Google Cloud.
__init__() -> None
Initializes the GoogleBaseFileOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to file operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(bucket: str, filepath: str)
Executes file processing logic.
This method needs to be implemented in subclasses.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the bucket where the file is located. |
required |
filepath
|
str
|
The path to the file within the bucket. |
required |
Raises:
Type | Description |
---|---|
NotImplementedError
|
This method should be implemented by subclasses. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes file logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the file. |
required |
GoogleDelayOperator
Bases: GoogleBaseEventOperator
, DelayOperator
Operator that adds a delay to the pipeline.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str) -> None
Executes the delay operation.
The function sleeps for the number of seconds specified, capping the maximum wait time at 500 seconds.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing a key 'seconds' which determines how many seconds the operator should wait. |
required |
topic
|
str
|
The topic to which the event is associated. This parameter is not utilized in this operator. |
required |
Returns:
Type | Description |
---|---|
None
|
None. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
GoogleRedirectOperator
Bases: GoogleBaseEventOperator
, RedirectOperator
Google Cloud implementation of RedirectOperator.
Operator that receives one event from a Google Pub/Sub topic and publishes multiple messages to another topic.
add_key(obj: dict, keys: list, value: Any) -> dict
Adds a value to a nested dictionary at a specified key path.
This method takes an object (dictionary), a list of keys representing a path to a location in that dictionary, and a value to insert at that location. If the specified nested keys do not exist in the dictionary, they will be created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj
|
dict
|
The dictionary to which the keys and value will be added. |
required |
keys
|
list
|
A list of keys representing the path in the dictionary. |
required |
value
|
Any
|
The value to be set at the specified key path. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A new dictionary with the value added at the specified key path. |
Example
obj = {'a': {'b': 1}} keys = ['a', 'c'] value = 2 result = add_key(self, obj, keys, value)
result will be {'a': {'b': 1, 'c': 2}}
add_param_to_message(message: dict, param: dict) -> list
Adds a parameter's values to a single message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
dict
|
The original message to which parameters will be added. |
required |
param
|
dict
|
A dictionary containing a 'key' and a list of 'values'. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of new messages with the parameter added. |
add_param_to_messages(messages: list, param: dict) -> list
Adds a single parameter to each message in a list of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to modify. |
required |
param
|
dict
|
A dictionary containing a 'key' and 'values'. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of modified messages with the parameter added. |
add_params_to_messages(messages: list, params: list) -> list
Adds parameters to each message in a list of messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to be modified. |
required |
params
|
list of dict
|
A list of parameter dictionaries, each containing a key and a list of values. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
list
|
A list of messages with the parameters added. |
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: dict, topic: str) -> None
Executes the operator, publishing messages to a specified topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing event data with keys: - project (str): The project where the destination queue is hosted. - topic (str): The queue topic to which the newly generated messages will be published. - messages (list): A list of initial messages to publish. - params (list of dict): A list of parameters to modify the messages. |
required |
topic
|
str
|
The topic to publish the messages to. |
required |
Returns:
Type | Description |
---|---|
None
|
None |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |