Skip to content

operator

operator

__all__ = ['GoogleBaseEventOperator', 'GoogleBaseFileOperator', 'GoogleDelayOperator', 'GoogleRedirectOperator'] module-attribute

GoogleBaseEventOperator

Bases: BaseEventOperator

Base operator for event operations in Google Cloud.

__init__() -> None

Initializes the GoogleBaseEventOperator.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data: dict, topic: str)

Executes event processing logic.

This method needs to be implemented in subclasses.

Parameters:

Name Type Description Default
data dict

The data associated with the event.

required
topic str

The event topic.

required

Raises:

Type Description
NotImplementedError

This method should be implemented by subclasses.

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

GoogleBaseFileOperator

Bases: BaseFileOperator

Base operator for file operations in Google Cloud.

__init__() -> None

Initializes the GoogleBaseFileOperator.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to file operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(bucket: str, filepath: str)

Executes file processing logic.

This method needs to be implemented in subclasses.

Parameters:

Name Type Description Default
bucket str

The name of the bucket where the file is located.

required
filepath str

The path to the file within the bucket.

required

Raises:

Type Description
NotImplementedError

This method should be implemented by subclasses.

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes file logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the file.

required

GoogleDelayOperator

Bases: GoogleBaseEventOperator, DelayOperator

Operator that adds a delay to the pipeline.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data: dict, topic: str) -> None

Executes the delay operation.

The function sleeps for the number of seconds specified, capping the maximum wait time at 500 seconds.

Parameters:

Name Type Description Default
data dict

A dictionary containing a key 'seconds' which determines how many seconds the operator should wait.

required
topic str

The topic to which the event is associated. This parameter is not utilized in this operator.

required

Returns:

Type Description
None

None.

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

GoogleRedirectOperator

Bases: GoogleBaseEventOperator, RedirectOperator

Google Cloud implementation of RedirectOperator.

Operator that receives one event from a Google Pub/Sub topic and publishes multiple messages to another topic.

add_key(obj: dict, keys: list, value: Any) -> dict

Adds a value to a nested dictionary at a specified key path.

This method takes an object (dictionary), a list of keys representing a path to a location in that dictionary, and a value to insert at that location. If the specified nested keys do not exist in the dictionary, they will be created.

Parameters:

Name Type Description Default
obj dict

The dictionary to which the keys and value will be added.

required
keys list

A list of keys representing the path in the dictionary.

required
value Any

The value to be set at the specified key path.

required

Returns:

Name Type Description
dict dict

A new dictionary with the value added at the specified key path.

Example

obj = {'a': {'b': 1}} keys = ['a', 'c'] value = 2 result = add_key(self, obj, keys, value)

result will be {'a': {'b': 1, 'c': 2}}

add_param_to_message(message: dict, param: dict) -> list

Adds a parameter's values to a single message.

Parameters:

Name Type Description Default
message dict

The original message to which parameters will be added.

required
param dict

A dictionary containing a 'key' and a list of 'values'.

required

Returns:

Name Type Description
list list

A list of new messages with the parameter added.

add_param_to_messages(messages: list, param: dict) -> list

Adds a single parameter to each message in a list of messages.

Parameters:

Name Type Description Default
messages list

A list of messages to modify.

required
param dict

A dictionary containing a 'key' and 'values'.

required

Returns:

Name Type Description
list list

A list of modified messages with the parameter added.

add_params_to_messages(messages: list, params: list) -> list

Adds parameters to each message in a list of messages.

Parameters:

Name Type Description Default
messages list

A list of messages to be modified.

required
params list of dict

A list of parameter dictionaries, each containing a key and a list of values.

required

Returns:

Name Type Description
list list

A list of messages with the parameters added.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data: dict, topic: str) -> None

Executes the operator, publishing messages to a specified topic.

Parameters:

Name Type Description Default
data dict

A dictionary containing event data with keys: - project (str): The project where the destination queue is hosted. - topic (str): The queue topic to which the newly generated messages will be published. - messages (list): A list of initial messages to publish. - params (list of dict): A list of parameters to modify the messages.

required
topic str

The topic to publish the messages to.

required

Returns:

Type Description
None

None

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required