Skip to content

hook

hook

__all__ = ['GcsHook', 'GcsDatalakeHook'] module-attribute

GcsDatalakeHook

Bases: GcsHook, DatalakeHook

Hook for interacting with GCS Datalake.

__init__() -> None

Initializes the GcsDatalakeHook.

build_filepath(bucket: str, filepath: str) -> str

Builds the full GCS file path.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
str str

The full GCS file path.

build_metadata(message_id: Optional[int], origin: Optional[str]) -> Dict[str, Any]

Builds metadata for the data being sent.

Parameters:

Name Type Description Default
message_id Optional[int]

The message ID.

required
origin Optional[str]

The origin of the data.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The metadata dictionary.

check_existance(bucket: str, filepath: str) -> bool

Checks if a file exists in GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
bool bool

True if the file exists, False otherwise.

copy_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None

Copies blobs from one bucket to another.

Parameters:

Name Type Description Default
bucket Bucket

The source bucket.

required
blobs List[Blob]

The list of blobs to copy.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required

delete(bucket_name: str, prefix: Optional[str] = None, files: Optional[List[str]] = None) -> None

Deletes files from GCS.

Parameters:

Name Type Description Default
bucket_name str

The name of the GCS bucket.

required
prefix Optional[str]

The prefix for files to delete. Defaults to None.

None
files Optional[List[str]]

The list of specific files to delete. Defaults to None.

None

delete_blobs(blobs: List[storage.Blob]) -> None

Deletes a list of blobs.

Parameters:

Name Type Description Default
blobs List[Blob]

The list of blobs to delete.

required

download(bucket: str, filepath: str, target_filepath: Optional[str] = None) -> None

Downloads a file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
target_filepath Optional[str]

The target file path. Defaults to None.

None

files_to_blobs(bucket: storage.Bucket, files: List[str]) -> List[storage.Blob]

Converts a list of file names to blobs.

Parameters:

Name Type Description Default
bucket Bucket

The GCS bucket.

required
files List[str]

The list of file names.

required

Returns:

Type Description
List[Blob]

List[storage.Blob]: The list of blobs.

list(bucket_name: str, prefix: Optional[str] = None) -> List[storage.Blob]

Lists blobs in a GCS bucket.

Parameters:

Name Type Description Default
bucket_name str

The name of the GCS bucket.

required
prefix Optional[str]

The prefix to filter blobs. Defaults to None.

None

Returns:

Type Description
List[Blob]

List[storage.Blob]: The list of blobs.

move(from_bucket: str, from_prefix: str, to_bucket: str, to_directory: str, rewrite: bool) -> None

Moves files from one GCS location to another.

Parameters:

Name Type Description Default
from_bucket str

The source bucket.

required
from_prefix str

The source prefix.

required
to_bucket str

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

move_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str, rewrite: bool) -> None

Moves blobs from one bucket to another.

Parameters:

Name Type Description Default
bucket Bucket

The source bucket.

required
blobs List[Blob]

The list of blobs to move.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

move_files(from_bucket: str, files: List[str], to_bucket: str, to_directory: str, rewrite: bool) -> None

Moves specified files from one GCS location to another.

Parameters:

Name Type Description Default
from_bucket str

The source bucket.

required
files List[str]

The list of files to move.

required
to_bucket str

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

prepare_row(row: Any, metadata: Dict[str, Any], now: datetime) -> Dict[str, Any]

Prepares a row for insertion into the datalake.

Parameters:

Name Type Description Default
row Any

The row data.

required
metadata Dict[str, Any]

The metadata for the row.

required
now datetime

The current timestamp.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The prepared row.

prepare_rows(data: Any, metadata: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], datetime]

Prepares multiple rows for insertion into the datalake.

Parameters:

Name Type Description Default
data Any

The data to prepare.

required
metadata Dict[str, Any]

The metadata for the rows.

required

Returns:

Type Description
Tuple[List[Dict[str, Any]], datetime]

Tuple[List[Dict[str, Any]], datetime]: The prepared rows and the current timestamp.

read_as_bytes(bucket: str, filepath: str) -> bytes

Reads a file from GCS as bytes.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
bytes bytes

The content of the file as bytes.

read_as_string(bucket: str, filepath: str, encoding: Optional[str] = None) -> str

Reads a file from GCS as a string.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Name Type Description
str str

The content of the file as a string.

read_json(bucket: str, filepath: str, encoding: Optional[str] = None) -> Any

Reads a JSON file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Name Type Description
Any Any

The content of the JSON file.

read_ndjson(bucket: str, filepath: str, encoding: Optional[str] = None) -> List[Any]

Reads an NDJSON file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Type Description
List[Any]

List[Any]: The content of the NDJSON file.

rewrite_blobs(blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None

Rewrites blobs in the destination bucket.

Parameters:

Name Type Description Default
blobs List[Blob]

The list of blobs to rewrite.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required

send_to_landing_zone(data: Any, dataset: str, table: str, message_id: Optional[int], origin: Optional[str], time_partition: bool = False) -> Union[str, None]

Sends data to the landing zone in GCS.

Parameters:

Name Type Description Default
data Any

The data to send.

required
dataset str

The dataset name.

required
table str

The table name.

required
message_id Optional[int]

The message ID.

required
origin Optional[str]

The origin of the data.

required
time_partition bool

Whether to use time partitioning. Defaults to False.

False

Returns:

Type Description
Union[str, None]

Union[str, None]: The path to the uploaded file or None.

upload(local_filepath: str, bucket_name: str, directory: str) -> str

Uploads a local file to GCS.

Parameters:

Name Type Description Default
local_filepath str

The path to the local file.

required
bucket_name str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required

Returns:

Name Type Description
str str

The path to the uploaded file in GCS.

upload_folder(local_path: str, bucket: str, gcs_path: str) -> None

Uploads a folder to GCS.

Parameters:

Name Type Description Default
local_path str

The local folder path.

required
bucket str

The name of the GCS bucket.

required
gcs_path str

The GCS path to upload to.

required

upload_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str

Uploads data from memory to GCS.

Parameters:

Name Type Description Default
data Any

The data to upload.

required
bucket str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required
filename str

The name of the file to create.

required
Kwargs

add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True. use_ndjson (bool, optional): If True, writes data in NDJSON format. Defaults to False. mode (str, optional): The mode for opening the file. Defaults to 'w'.

Returns:

Name Type Description
str str

The path to the uploaded file.

upload_parquet_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str

Uploads Parquet data from memory to GCS.

Parameters:

Name Type Description Default
data Any

The data to upload.

required
bucket str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required
filename str

The name of the Parquet file to create.

required
Kwargs

schema (pa.Schema, optional): The schema for the Parquet table. Defaults to None. add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True.

Returns:

Name Type Description
str str

The path to the uploaded Parquet file.

GcsHook

Bases: BaseHook

Hook for interacting with Google Cloud Storage.

__init__() -> None

Initializes the GcsHook.

build_filepath(bucket: str, filepath: str) -> str

Builds the full GCS file path.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
str str

The full GCS file path.

check_existance(bucket: str, filepath: str) -> bool

Checks if a file exists in GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
bool bool

True if the file exists, False otherwise.

copy_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None

Copies blobs from one bucket to another.

Parameters:

Name Type Description Default
bucket Bucket

The source bucket.

required
blobs List[Blob]

The list of blobs to copy.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required

delete(bucket_name: str, prefix: Optional[str] = None, files: Optional[List[str]] = None) -> None

Deletes files from GCS.

Parameters:

Name Type Description Default
bucket_name str

The name of the GCS bucket.

required
prefix Optional[str]

The prefix for files to delete. Defaults to None.

None
files Optional[List[str]]

The list of specific files to delete. Defaults to None.

None

delete_blobs(blobs: List[storage.Blob]) -> None

Deletes a list of blobs.

Parameters:

Name Type Description Default
blobs List[Blob]

The list of blobs to delete.

required

download(bucket: str, filepath: str, target_filepath: Optional[str] = None) -> None

Downloads a file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
target_filepath Optional[str]

The target file path. Defaults to None.

None

files_to_blobs(bucket: storage.Bucket, files: List[str]) -> List[storage.Blob]

Converts a list of file names to blobs.

Parameters:

Name Type Description Default
bucket Bucket

The GCS bucket.

required
files List[str]

The list of file names.

required

Returns:

Type Description
List[Blob]

List[storage.Blob]: The list of blobs.

list(bucket_name: str, prefix: Optional[str] = None) -> List[storage.Blob]

Lists blobs in a GCS bucket.

Parameters:

Name Type Description Default
bucket_name str

The name of the GCS bucket.

required
prefix Optional[str]

The prefix to filter blobs. Defaults to None.

None

Returns:

Type Description
List[Blob]

List[storage.Blob]: The list of blobs.

move(from_bucket: str, from_prefix: str, to_bucket: str, to_directory: str, rewrite: bool) -> None

Moves files from one GCS location to another.

Parameters:

Name Type Description Default
from_bucket str

The source bucket.

required
from_prefix str

The source prefix.

required
to_bucket str

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

move_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str, rewrite: bool) -> None

Moves blobs from one bucket to another.

Parameters:

Name Type Description Default
bucket Bucket

The source bucket.

required
blobs List[Blob]

The list of blobs to move.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

move_files(from_bucket: str, files: List[str], to_bucket: str, to_directory: str, rewrite: bool) -> None

Moves specified files from one GCS location to another.

Parameters:

Name Type Description Default
from_bucket str

The source bucket.

required
files List[str]

The list of files to move.

required
to_bucket str

The destination bucket.

required
to_directory str

The destination directory.

required
rewrite bool

Whether to overwrite existing files.

required

read_as_bytes(bucket: str, filepath: str) -> bytes

Reads a file from GCS as bytes.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required

Returns:

Name Type Description
bytes bytes

The content of the file as bytes.

read_as_string(bucket: str, filepath: str, encoding: Optional[str] = None) -> str

Reads a file from GCS as a string.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Name Type Description
str str

The content of the file as a string.

read_json(bucket: str, filepath: str, encoding: Optional[str] = None) -> Any

Reads a JSON file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Name Type Description
Any Any

The content of the JSON file.

read_ndjson(bucket: str, filepath: str, encoding: Optional[str] = None) -> List[Any]

Reads an NDJSON file from GCS.

Parameters:

Name Type Description Default
bucket str

The name of the GCS bucket.

required
filepath str

The file path.

required
encoding Optional[str]

The encoding to use. Defaults to None.

None

Returns:

Type Description
List[Any]

List[Any]: The content of the NDJSON file.

rewrite_blobs(blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None

Rewrites blobs in the destination bucket.

Parameters:

Name Type Description Default
blobs List[Blob]

The list of blobs to rewrite.

required
to_bucket Bucket

The destination bucket.

required
to_directory str

The destination directory.

required

upload(local_filepath: str, bucket_name: str, directory: str) -> str

Uploads a local file to GCS.

Parameters:

Name Type Description Default
local_filepath str

The path to the local file.

required
bucket_name str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required

Returns:

Name Type Description
str str

The path to the uploaded file in GCS.

upload_folder(local_path: str, bucket: str, gcs_path: str) -> None

Uploads a folder to GCS.

Parameters:

Name Type Description Default
local_path str

The local folder path.

required
bucket str

The name of the GCS bucket.

required
gcs_path str

The GCS path to upload to.

required

upload_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str

Uploads data from memory to GCS.

Parameters:

Name Type Description Default
data Any

The data to upload.

required
bucket str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required
filename str

The name of the file to create.

required
Kwargs

add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True. use_ndjson (bool, optional): If True, writes data in NDJSON format. Defaults to False. mode (str, optional): The mode for opening the file. Defaults to 'w'.

Returns:

Name Type Description
str str

The path to the uploaded file.

upload_parquet_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str

Uploads Parquet data from memory to GCS.

Parameters:

Name Type Description Default
data Any

The data to upload.

required
bucket str

The name of the GCS bucket.

required
directory str

The directory within the bucket.

required
filename str

The name of the Parquet file to create.

required
Kwargs

schema (pa.Schema, optional): The schema for the Parquet table. Defaults to None. add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True.

Returns:

Name Type Description
str str

The path to the uploaded Parquet file.