operator
operator
__all__ = ['FileUrlToGcsOperator', 'FtpToGcsOperator', 'FileDetectOperator', 'BatchWriteDetectOperator', 'BatchWriteProcessOperator', 'FileDeleteOperator', 'FileMoveOperator', 'GoogleErrorReprocessOperator']
module-attribute
BatchWriteDetectOperator
Bases: GoogleBaseEventOperator
Operator to detect batches of files in GCS based on thresholds.
This operator lists files in a GCS bucket prefix and groups them by directory. It checks if the accumulated size, file quantity, or age of files in a directory exceeds predefined thresholds. If so, it sends a message to a Pub/Sub topic to process the batch.
Note
This class is deprecated and will be removed in a future version.
Use GcsDatalakeHook
for writing files directly to the datalake.
__init__()
Initializes the BatchWriteDetectOperator.
Sets up FileHook and GcsHook.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the batch detection logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing parameters:
- bucket (str, optional): The GCS bucket. Defaults to |
required |
topic
|
str
|
The Pub/Sub topic to publish messages to (unused in current logic). |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
send_to_process(bucket, directory, files)
Sends a message to Pub/Sub to process a batch of files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The GCS bucket of the files. |
required |
directory
|
str
|
The common directory (prefix) of the files. |
required |
files
|
list
|
A list of filenames in the batch. |
required |
BatchWriteProcessOperator
Bases: GoogleBaseEventOperator
Operator to process batches of files from GCS.
This operator reads multiple JSON files from a specified GCS location, merges their content into a single local NDJSON file, uploads the merged file to another GCS location, and then moves the original processed files to an archive location.
Note
This class is deprecated and will be removed in a future version.
Use GcsDatalakeHook
for writing files directly to the datalake.
__init__()
Initializes the BatchWriteProcessOperator.
Sets up FileHook and GcsHook.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the batch processing logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing parameters: - bucket (str): The source GCS bucket. - directory (str): The directory within the source bucket. - files (list): A list of filenames to process. |
required |
topic
|
str
|
The Pub/Sub topic from which the message was received (unused). |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
merge_files(file_contents)
Merges file contents into a local NDJSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_contents
|
list
|
A list of dictionaries or objects to write. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
The path to the created local NDJSON file. |
read_files(bucket, directory, files)
Reads multiple JSON files from GCS and concatenates their contents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
files
|
list
|
A list of filenames to read. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
A list containing the combined content of the JSON files. |
Raises:
Type | Description |
---|---|
Exception
|
If a file content is not a list or dict. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
FileDeleteOperator
Bases: GoogleBaseEventOperator
Operator to delete files or prefixes in a GCS bucket.
__init__()
Initializes the FileDeleteOperator.
Sets up the GcsHook.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the file deletion logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing parameters: - bucket (str): The GCS bucket. - prefix (str, optional): The prefix to delete. - files (list, optional): A list of specific filepaths to delete. |
required |
topic
|
str
|
The Pub/Sub topic (unused). |
required |
Raises:
Type | Description |
---|---|
Exception
|
If neither 'prefix' nor 'files' is provided. |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
FileDetectOperator
Bases: GoogleBaseFileOperator
Operator to detect files in GCS and send messages to a queue.
This operator is triggered when a new file is detected in a GCS bucket. It reads configuration for the file, builds success messages, and publishes them to a specified Google Cloud Pub/Sub topic.
__init__()
Initializes the FileDetectOperator.
Sets up the GCS hook for interacting with Google Cloud Storage.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to file operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
build_success_message(bucket, filepath)
Builds success messages based on the file's ingestion configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The GCS bucket of the file. |
required |
filepath
|
str
|
The path to the file. |
required |
Returns:
Name | Type | Description |
---|---|---|
list |
A list of dictionaries, each representing a success message containing metadata for file processing. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(bucket, filepath)
Executes the operator's logic.
Builds success messages based on the detected file and its configuration, then publishes these messages to the configured Pub/Sub topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The GCS bucket where the file was detected. |
required |
filepath
|
str
|
The path to the detected file within the bucket. |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
get_ingest_config(filepath)
Retrieves ingestion configuration for a given filepath.
Parses the filepath to determine dataset, table, and mode. Reads a configuration file from GCS based on dataset and table. Handles single or multiple configurations within the config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
The GCS path to the file. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
A tuple containing various configuration parameters. |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If the metadata format in the config file is not a list or dict. |
read_config_file(dataset, table)
Reads the ingestion configuration file from GCS.
If the config file is not found, returns a default configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset
|
str
|
The dataset name. |
required |
table
|
str
|
The table name. |
required |
Returns:
Type | Description |
---|---|
dict or list: The configuration data, or a default config if not found. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes file logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the file. |
required |
split_filepath(filepath)
Splits a GCS filepath into dataset, table, and mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath
|
str
|
The GCS filepath string. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
A tuple containing dataset (str), table (str), and mode (str). |
Raises:
Type | Description |
---|---|
Exception
|
If the filepath format is invalid. |
FileMoveOperator
Bases: GoogleBaseEventOperator
Operator to move files or prefixes within or between GCS buckets.
__init__()
Initializes the FileMoveOperator.
Sets up the GcsHook.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the file moving logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
A dictionary containing parameters: - origin (dict): Contains 'bucket' (str) and 'prefix' (str) for the source. - destination (dict): Contains 'bucket' (str) and 'directory' (str) for the destination. |
required |
topic
|
str
|
The Pub/Sub topic (unused). |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
FileUrlToGcsOperator
Bases: GoogleBaseEventOperator
Operator for transferring files from a URL to GCS.
__init__() -> None
Initializes the FileUrlToGcsOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: Dict[str, Any], topic: str) -> None
Executes the file transfer from URL to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Dict[str, Any]
|
The data containing URL and GCS information. |
required |
topic
|
str
|
The Pub/Sub topic. |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
move_to_destinations(local_filepath: str, destination: Union[Dict[str, Any], List[Dict[str, Any]]]) -> None
Moves the downloaded file to the specified destinations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The local file path. |
required |
destination
|
Union[Dict[str, Any], List[Dict[str, Any]]]
|
The destination(s) for the file. |
required |
remove_null_byte(local_filepath: str) -> None
Removes null bytes from the specified file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The local file path. |
required |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
FtpToGcsOperator
Bases: FileUrlToGcsOperator
Operator for transferring files from FTP to GCS.
__init__() -> None
Initializes the FtpToGcsOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data: Dict[str, str], topic: str) -> None
Executes the FTP to GCS transfer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Dict[str, str]
|
The data containing FTP and GCS information. |
required |
topic
|
str
|
The Pub/Sub topic. |
required |
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
move_to_destinations(local_filepath: str, destination: Union[Dict[str, Any], List[Dict[str, Any]]]) -> None
Moves the downloaded file to the specified destinations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The local file path. |
required |
destination
|
Union[Dict[str, Any], List[Dict[str, Any]]]
|
The destination(s) for the file. |
required |
remove_null_byte(local_filepath: str) -> None
Removes null bytes from the specified file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The local file path. |
required |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |
GoogleErrorReprocessOperator
Bases: GoogleBaseEventOperator
, ErrorReprocessOperator
Operator for reprocessing errors in Google Cloud.
__init__() -> None
Initializes the GoogleErrorReprocessOperator.
build_error_message(message: str, data: dict) -> dict
Builds an error message specific to event operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message. |
required |
data
|
dict
|
The associated data. |
required |
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A constructed error message. |
chain_messages(messages: list) -> tuple
Chains messages together for processing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
messages
|
list
|
A list of messages to chain. |
required |
Returns:
Name | Type | Description |
---|---|---|
tuple |
tuple
|
A tuple containing chained message data and the first topic. |
execute(data, topic)
Executes the error processing logic for the given data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
dict
|
The event data that needs to be processed. |
required |
topic
|
str
|
The topic from which the event is received. |
required |
Raises:
Type | Description |
---|---|
KeyError
|
If required keys are missing in the data dictionary. |
This method retrieves necessary metadata from the input data, handles retries based on the specified parameters, and publishes either the retried event back to the original topic or saves the error details to the datalake if maximum retries have been exceeded.
extract_message_id(cloud_event) -> str
Extracts the message ID from the cloud event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event from which to extract the message ID. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The extracted message ID. |
report_error(message: str, data: dict = None)
Reports an error by logging it and publishing to a queue.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
str
|
The error message to report. |
required |
data
|
dict
|
Additional data associated with the error. Defaults to None. |
None
|
run(cloud_event) -> None
Processes the incoming cloud event and executes event logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cloud_event
|
CloudEvent
|
The cloud event containing metadata about the event. |
required |
run_next(tasks: list) -> None
Executes the next tasks in the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tasks
|
list
|
A list of tasks to execute next. |
required |