Skip to content

operator

operator

__all__ = ['FileUrlToGcsOperator', 'FtpToGcsOperator', 'FileDetectOperator', 'BatchWriteDetectOperator', 'BatchWriteProcessOperator', 'FileDeleteOperator', 'FileMoveOperator', 'GoogleErrorReprocessOperator'] module-attribute

BatchWriteDetectOperator

Bases: GoogleBaseEventOperator

Operator to detect batches of files in GCS based on thresholds.

This operator lists files in a GCS bucket prefix and groups them by directory. It checks if the accumulated size, file quantity, or age of files in a directory exceeds predefined thresholds. If so, it sends a message to a Pub/Sub topic to process the batch.

Note

This class is deprecated and will be removed in a future version. Use GcsDatalakeHook for writing files directly to the datalake.

__init__()

Initializes the BatchWriteDetectOperator.

Sets up FileHook and GcsHook.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data, topic)

Executes the batch detection logic.

Parameters:

Name Type Description Default
data dict

A dictionary containing parameters: - bucket (str, optional): The GCS bucket. Defaults to GCS_BUCKET_LANDING_ZONE. - prefix (str, optional): The GCS prefix to scan. - threshold (dict): A dictionary with thresholds for 'size' (bytes), 'file_quantity', and 'minutes' (age).

required
topic str

The Pub/Sub topic to publish messages to (unused in current logic).

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

send_to_process(bucket, directory, files)

Sends a message to Pub/Sub to process a batch of files.

Parameters:

Name Type Description Default
bucket str

The GCS bucket of the files.

required
directory str

The common directory (prefix) of the files.

required
files list

A list of filenames in the batch.

required

BatchWriteProcessOperator

Bases: GoogleBaseEventOperator

Operator to process batches of files from GCS.

This operator reads multiple JSON files from a specified GCS location, merges their content into a single local NDJSON file, uploads the merged file to another GCS location, and then moves the original processed files to an archive location.

Note

This class is deprecated and will be removed in a future version. Use GcsDatalakeHook for writing files directly to the datalake.

__init__()

Initializes the BatchWriteProcessOperator.

Sets up FileHook and GcsHook.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data, topic)

Executes the batch processing logic.

Parameters:

Name Type Description Default
data dict

A dictionary containing parameters: - bucket (str): The source GCS bucket. - directory (str): The directory within the source bucket. - files (list): A list of filenames to process.

required
topic str

The Pub/Sub topic from which the message was received (unused).

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

merge_files(file_contents)

Merges file contents into a local NDJSON file.

Parameters:

Name Type Description Default
file_contents list

A list of dictionaries or objects to write.

required

Returns:

Name Type Description
str

The path to the created local NDJSON file.

read_files(bucket, directory, files)

Reads multiple JSON files from GCS and concatenates their contents.

Parameters:

Name Type Description Default
bucket str

The GCS bucket.

required
directory str

The directory within the bucket.

required
files list

A list of filenames to read.

required

Returns:

Name Type Description
list

A list containing the combined content of the JSON files.

Raises:

Type Description
Exception

If a file content is not a list or dict.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

FileDeleteOperator

Bases: GoogleBaseEventOperator

Operator to delete files or prefixes in a GCS bucket.

__init__()

Initializes the FileDeleteOperator.

Sets up the GcsHook.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data, topic)

Executes the file deletion logic.

Parameters:

Name Type Description Default
data dict

A dictionary containing parameters: - bucket (str): The GCS bucket. - prefix (str, optional): The prefix to delete. - files (list, optional): A list of specific filepaths to delete.

required
topic str

The Pub/Sub topic (unused).

required

Raises:

Type Description
Exception

If neither 'prefix' nor 'files' is provided.

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

FileDetectOperator

Bases: GoogleBaseFileOperator

Operator to detect files in GCS and send messages to a queue.

This operator is triggered when a new file is detected in a GCS bucket. It reads configuration for the file, builds success messages, and publishes them to a specified Google Cloud Pub/Sub topic.

__init__()

Initializes the FileDetectOperator.

Sets up the GCS hook for interacting with Google Cloud Storage.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to file operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

build_success_message(bucket, filepath)

Builds success messages based on the file's ingestion configuration.

Parameters:

Name Type Description Default
bucket str

The GCS bucket of the file.

required
filepath str

The path to the file.

required

Returns:

Name Type Description
list

A list of dictionaries, each representing a success message containing metadata for file processing.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(bucket, filepath)

Executes the operator's logic.

Builds success messages based on the detected file and its configuration, then publishes these messages to the configured Pub/Sub topic.

Parameters:

Name Type Description Default
bucket str

The GCS bucket where the file was detected.

required
filepath str

The path to the detected file within the bucket.

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

get_ingest_config(filepath)

Retrieves ingestion configuration for a given filepath.

Parses the filepath to determine dataset, table, and mode. Reads a configuration file from GCS based on dataset and table. Handles single or multiple configurations within the config file.

Parameters:

Name Type Description Default
filepath str

The GCS path to the file.

required

Returns:

Name Type Description
tuple

A tuple containing various configuration parameters.

Raises:

Type Description
NotImplementedError

If the metadata format in the config file is not a list or dict.

read_config_file(dataset, table)

Reads the ingestion configuration file from GCS.

If the config file is not found, returns a default configuration.

Parameters:

Name Type Description Default
dataset str

The dataset name.

required
table str

The table name.

required

Returns:

Type Description

dict or list: The configuration data, or a default config if not found.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes file logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the file.

required

split_filepath(filepath)

Splits a GCS filepath into dataset, table, and mode.

Parameters:

Name Type Description Default
filepath str

The GCS filepath string.

required

Returns:

Name Type Description
tuple

A tuple containing dataset (str), table (str), and mode (str).

Raises:

Type Description
Exception

If the filepath format is invalid.

FileMoveOperator

Bases: GoogleBaseEventOperator

Operator to move files or prefixes within or between GCS buckets.

__init__()

Initializes the FileMoveOperator.

Sets up the GcsHook.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data, topic)

Executes the file moving logic.

Parameters:

Name Type Description Default
data dict

A dictionary containing parameters: - origin (dict): Contains 'bucket' (str) and 'prefix' (str) for the source. - destination (dict): Contains 'bucket' (str) and 'directory' (str) for the destination.

required
topic str

The Pub/Sub topic (unused).

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

FileUrlToGcsOperator

Bases: GoogleBaseEventOperator

Operator for transferring files from a URL to GCS.

__init__() -> None

Initializes the FileUrlToGcsOperator.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data: Dict[str, Any], topic: str) -> None

Executes the file transfer from URL to GCS.

Parameters:

Name Type Description Default
data Dict[str, Any]

The data containing URL and GCS information.

required
topic str

The Pub/Sub topic.

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

move_to_destinations(local_filepath: str, destination: Union[Dict[str, Any], List[Dict[str, Any]]]) -> None

Moves the downloaded file to the specified destinations.

Parameters:

Name Type Description Default
local_filepath str

The local file path.

required
destination Union[Dict[str, Any], List[Dict[str, Any]]]

The destination(s) for the file.

required

remove_null_byte(local_filepath: str) -> None

Removes null bytes from the specified file.

Parameters:

Name Type Description Default
local_filepath str

The local file path.

required

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

FtpToGcsOperator

Bases: FileUrlToGcsOperator

Operator for transferring files from FTP to GCS.

__init__() -> None

Initializes the FtpToGcsOperator.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data: Dict[str, str], topic: str) -> None

Executes the FTP to GCS transfer.

Parameters:

Name Type Description Default
data Dict[str, str]

The data containing FTP and GCS information.

required
topic str

The Pub/Sub topic.

required

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

move_to_destinations(local_filepath: str, destination: Union[Dict[str, Any], List[Dict[str, Any]]]) -> None

Moves the downloaded file to the specified destinations.

Parameters:

Name Type Description Default
local_filepath str

The local file path.

required
destination Union[Dict[str, Any], List[Dict[str, Any]]]

The destination(s) for the file.

required

remove_null_byte(local_filepath: str) -> None

Removes null bytes from the specified file.

Parameters:

Name Type Description Default
local_filepath str

The local file path.

required

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required

GoogleErrorReprocessOperator

Bases: GoogleBaseEventOperator, ErrorReprocessOperator

Operator for reprocessing errors in Google Cloud.

__init__() -> None

Initializes the GoogleErrorReprocessOperator.

build_error_message(message: str, data: dict) -> dict

Builds an error message specific to event operations.

Parameters:

Name Type Description Default
message str

The error message.

required
data dict

The associated data.

required

Returns:

Name Type Description
dict dict

A constructed error message.

chain_messages(messages: list) -> tuple

Chains messages together for processing.

Parameters:

Name Type Description Default
messages list

A list of messages to chain.

required

Returns:

Name Type Description
tuple tuple

A tuple containing chained message data and the first topic.

execute(data, topic)

Executes the error processing logic for the given data.

Parameters:

Name Type Description Default
data dict

The event data that needs to be processed.

required
topic str

The topic from which the event is received.

required

Raises:

Type Description
KeyError

If required keys are missing in the data dictionary.

This method retrieves necessary metadata from the input data, handles retries based on the specified parameters, and publishes either the retried event back to the original topic or saves the error details to the datalake if maximum retries have been exceeded.

extract_message_id(cloud_event) -> str

Extracts the message ID from the cloud event.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event from which to extract the message ID.

required

Returns:

Name Type Description
str str

The extracted message ID.

report_error(message: str, data: dict = None)

Reports an error by logging it and publishing to a queue.

Parameters:

Name Type Description Default
message str

The error message to report.

required
data dict

Additional data associated with the error. Defaults to None.

None

run(cloud_event) -> None

Processes the incoming cloud event and executes event logic.

Parameters:

Name Type Description Default
cloud_event CloudEvent

The cloud event containing metadata about the event.

required

run_next(tasks: list) -> None

Executes the next tasks in the pipeline.

Parameters:

Name Type Description Default
tasks list

A list of tasks to execute next.

required