hook
hook
__all__ = ['GcsHook', 'GcsDatalakeHook']
module-attribute
GcsDatalakeHook
Bases: GcsHook
, DatalakeHook
Hook for interacting with GCS Datalake.
__init__() -> None
Initializes the GcsDatalakeHook.
build_filepath(bucket: str, filepath: str) -> str
Builds the full GCS file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The full GCS file path. |
build_metadata(message_id: Optional[int], origin: Optional[str]) -> Dict[str, Any]
Builds metadata for the data being sent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_id
|
Optional[int]
|
The message ID. |
required |
origin
|
Optional[str]
|
The origin of the data. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: The metadata dictionary. |
check_existance(bucket: str, filepath: str) -> bool
Checks if a file exists in GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the file exists, False otherwise. |
copy_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None
Copies blobs from one bucket to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The source bucket. |
required |
blobs
|
List[Blob]
|
The list of blobs to copy. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
delete(bucket_name: str, prefix: Optional[str] = None, files: Optional[List[str]] = None) -> None
Deletes files from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name
|
str
|
The name of the GCS bucket. |
required |
prefix
|
Optional[str]
|
The prefix for files to delete. Defaults to None. |
None
|
files
|
Optional[List[str]]
|
The list of specific files to delete. Defaults to None. |
None
|
delete_blobs(blobs: List[storage.Blob]) -> None
Deletes a list of blobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blobs
|
List[Blob]
|
The list of blobs to delete. |
required |
download(bucket: str, filepath: str, target_filepath: Optional[str] = None) -> None
Downloads a file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
target_filepath
|
Optional[str]
|
The target file path. Defaults to None. |
None
|
files_to_blobs(bucket: storage.Bucket, files: List[str]) -> List[storage.Blob]
Converts a list of file names to blobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The GCS bucket. |
required |
files
|
List[str]
|
The list of file names. |
required |
Returns:
Type | Description |
---|---|
List[Blob]
|
List[storage.Blob]: The list of blobs. |
list(bucket_name: str, prefix: Optional[str] = None) -> List[storage.Blob]
Lists blobs in a GCS bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name
|
str
|
The name of the GCS bucket. |
required |
prefix
|
Optional[str]
|
The prefix to filter blobs. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
List[Blob]
|
List[storage.Blob]: The list of blobs. |
move(from_bucket: str, from_prefix: str, to_bucket: str, to_directory: str, rewrite: bool) -> None
Moves files from one GCS location to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_bucket
|
str
|
The source bucket. |
required |
from_prefix
|
str
|
The source prefix. |
required |
to_bucket
|
str
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
move_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str, rewrite: bool) -> None
Moves blobs from one bucket to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The source bucket. |
required |
blobs
|
List[Blob]
|
The list of blobs to move. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
move_files(from_bucket: str, files: List[str], to_bucket: str, to_directory: str, rewrite: bool) -> None
Moves specified files from one GCS location to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_bucket
|
str
|
The source bucket. |
required |
files
|
List[str]
|
The list of files to move. |
required |
to_bucket
|
str
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
prepare_row(row: Any, metadata: Dict[str, Any], now: datetime) -> Dict[str, Any]
Prepares a row for insertion into the datalake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
row
|
Any
|
The row data. |
required |
metadata
|
Dict[str, Any]
|
The metadata for the row. |
required |
now
|
datetime
|
The current timestamp. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dict[str, Any]: The prepared row. |
prepare_rows(data: Any, metadata: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], datetime]
Prepares multiple rows for insertion into the datalake.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to prepare. |
required |
metadata
|
Dict[str, Any]
|
The metadata for the rows. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[Dict[str, Any]], datetime]
|
Tuple[List[Dict[str, Any]], datetime]: The prepared rows and the current timestamp. |
read_as_bytes(bucket: str, filepath: str) -> bytes
Reads a file from GCS as bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
bytes |
bytes
|
The content of the file as bytes. |
read_as_string(bucket: str, filepath: str, encoding: Optional[str] = None) -> str
Reads a file from GCS as a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The content of the file as a string. |
read_json(bucket: str, filepath: str, encoding: Optional[str] = None) -> Any
Reads a JSON file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
The content of the JSON file. |
read_ndjson(bucket: str, filepath: str, encoding: Optional[str] = None) -> List[Any]
Reads an NDJSON file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
List[Any]
|
List[Any]: The content of the NDJSON file. |
rewrite_blobs(blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None
Rewrites blobs in the destination bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blobs
|
List[Blob]
|
The list of blobs to rewrite. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
send_to_landing_zone(data: Any, dataset: str, table: str, message_id: Optional[int], origin: Optional[str], time_partition: bool = False) -> Union[str, None]
Sends data to the landing zone in GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to send. |
required |
dataset
|
str
|
The dataset name. |
required |
table
|
str
|
The table name. |
required |
message_id
|
Optional[int]
|
The message ID. |
required |
origin
|
Optional[str]
|
The origin of the data. |
required |
time_partition
|
bool
|
Whether to use time partitioning. Defaults to False. |
False
|
Returns:
Type | Description |
---|---|
Union[str, None]
|
Union[str, None]: The path to the uploaded file or None. |
upload(local_filepath: str, bucket_name: str, directory: str) -> str
Uploads a local file to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The path to the local file. |
required |
bucket_name
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded file in GCS. |
upload_folder(local_path: str, bucket: str, gcs_path: str) -> None
Uploads a folder to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_path
|
str
|
The local folder path. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
gcs_path
|
str
|
The GCS path to upload to. |
required |
upload_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str
Uploads data from memory to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to upload. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
filename
|
str
|
The name of the file to create. |
required |
Kwargs
add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True. use_ndjson (bool, optional): If True, writes data in NDJSON format. Defaults to False. mode (str, optional): The mode for opening the file. Defaults to 'w'.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded file. |
upload_parquet_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str
Uploads Parquet data from memory to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to upload. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
filename
|
str
|
The name of the Parquet file to create. |
required |
Kwargs
schema (pa.Schema, optional): The schema for the Parquet table. Defaults to None. add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded Parquet file. |
GcsHook
Bases: BaseHook
Hook for interacting with Google Cloud Storage.
__init__() -> None
Initializes the GcsHook.
build_filepath(bucket: str, filepath: str) -> str
Builds the full GCS file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The full GCS file path. |
check_existance(bucket: str, filepath: str) -> bool
Checks if a file exists in GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the file exists, False otherwise. |
copy_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None
Copies blobs from one bucket to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The source bucket. |
required |
blobs
|
List[Blob]
|
The list of blobs to copy. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
delete(bucket_name: str, prefix: Optional[str] = None, files: Optional[List[str]] = None) -> None
Deletes files from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name
|
str
|
The name of the GCS bucket. |
required |
prefix
|
Optional[str]
|
The prefix for files to delete. Defaults to None. |
None
|
files
|
Optional[List[str]]
|
The list of specific files to delete. Defaults to None. |
None
|
delete_blobs(blobs: List[storage.Blob]) -> None
Deletes a list of blobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blobs
|
List[Blob]
|
The list of blobs to delete. |
required |
download(bucket: str, filepath: str, target_filepath: Optional[str] = None) -> None
Downloads a file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
target_filepath
|
Optional[str]
|
The target file path. Defaults to None. |
None
|
files_to_blobs(bucket: storage.Bucket, files: List[str]) -> List[storage.Blob]
Converts a list of file names to blobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The GCS bucket. |
required |
files
|
List[str]
|
The list of file names. |
required |
Returns:
Type | Description |
---|---|
List[Blob]
|
List[storage.Blob]: The list of blobs. |
list(bucket_name: str, prefix: Optional[str] = None) -> List[storage.Blob]
Lists blobs in a GCS bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket_name
|
str
|
The name of the GCS bucket. |
required |
prefix
|
Optional[str]
|
The prefix to filter blobs. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
List[Blob]
|
List[storage.Blob]: The list of blobs. |
move(from_bucket: str, from_prefix: str, to_bucket: str, to_directory: str, rewrite: bool) -> None
Moves files from one GCS location to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_bucket
|
str
|
The source bucket. |
required |
from_prefix
|
str
|
The source prefix. |
required |
to_bucket
|
str
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
move_blobs(bucket: storage.Bucket, blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str, rewrite: bool) -> None
Moves blobs from one bucket to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
Bucket
|
The source bucket. |
required |
blobs
|
List[Blob]
|
The list of blobs to move. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
move_files(from_bucket: str, files: List[str], to_bucket: str, to_directory: str, rewrite: bool) -> None
Moves specified files from one GCS location to another.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
from_bucket
|
str
|
The source bucket. |
required |
files
|
List[str]
|
The list of files to move. |
required |
to_bucket
|
str
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
rewrite
|
bool
|
Whether to overwrite existing files. |
required |
read_as_bytes(bucket: str, filepath: str) -> bytes
Reads a file from GCS as bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
Returns:
Name | Type | Description |
---|---|---|
bytes |
bytes
|
The content of the file as bytes. |
read_as_string(bucket: str, filepath: str, encoding: Optional[str] = None) -> str
Reads a file from GCS as a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The content of the file as a string. |
read_json(bucket: str, filepath: str, encoding: Optional[str] = None) -> Any
Reads a JSON file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
The content of the JSON file. |
read_ndjson(bucket: str, filepath: str, encoding: Optional[str] = None) -> List[Any]
Reads an NDJSON file from GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bucket
|
str
|
The name of the GCS bucket. |
required |
filepath
|
str
|
The file path. |
required |
encoding
|
Optional[str]
|
The encoding to use. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
List[Any]
|
List[Any]: The content of the NDJSON file. |
rewrite_blobs(blobs: List[storage.Blob], to_bucket: storage.Bucket, to_directory: str) -> None
Rewrites blobs in the destination bucket.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
blobs
|
List[Blob]
|
The list of blobs to rewrite. |
required |
to_bucket
|
Bucket
|
The destination bucket. |
required |
to_directory
|
str
|
The destination directory. |
required |
upload(local_filepath: str, bucket_name: str, directory: str) -> str
Uploads a local file to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_filepath
|
str
|
The path to the local file. |
required |
bucket_name
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded file in GCS. |
upload_folder(local_path: str, bucket: str, gcs_path: str) -> None
Uploads a folder to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_path
|
str
|
The local folder path. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
gcs_path
|
str
|
The GCS path to upload to. |
required |
upload_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str
Uploads data from memory to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to upload. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
filename
|
str
|
The name of the file to create. |
required |
Kwargs
add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True. use_ndjson (bool, optional): If True, writes data in NDJSON format. Defaults to False. mode (str, optional): The mode for opening the file. Defaults to 'w'.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded file. |
upload_parquet_from_memory(data: Any, bucket: str, directory: str, filename: str, **kwargs: Any) -> str
Uploads Parquet data from memory to GCS.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data
|
Any
|
The data to upload. |
required |
bucket
|
str
|
The name of the GCS bucket. |
required |
directory
|
str
|
The directory within the bucket. |
required |
filename
|
str
|
The name of the Parquet file to create. |
required |
Kwargs
schema (pa.Schema, optional): The schema for the Parquet table. Defaults to None. add_timestamp (bool, optional): If True, adds a timestamp to the filename. Defaults to True.
Returns:
Name | Type | Description |
---|---|---|
str |
str
|
The path to the uploaded Parquet file. |