Data Load¶
The load module is responsible for loading data in the storage and access points of IRIS Platform. You can write data on our Azure Data Lake Storage gen 2, Blob Storages (like Bifrost…), also on our Presto/Trino external table and Data Warehouses or Data Marts.
File writer¶
If you have to write a dataframe in a file system you may use our class pyiris.ingestion.load.FileWriter, access the modules to have more information. Use example:
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import FileWriter
file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', mode = 'overwrite', partition_by="id")
file_writer = FileWriter(config = file_config)
file_writer.write(dataframe=extracted_dataset)
The mount_name argument set the storage where the data will be loaded. Please, consult module pyiris.ingestion.validator.file_writer_validator.FileWriterValidator documentation to have more information about arguments constraints.
The partition_by argument is optional. You may pass just one column, or a list of columns, for multiple partitioning.
Presto writer¶
This feature is responsible for creating or updating an external table in our Presto/Trino. Example:
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros', partition_by="id")
presto_writer = PrestoWriter(config=presto_config)
presto_writer.write(dataframe=extracted_dataset)
Please, consult module pyiris.ingestion.validator.presto_writer_validator.PrestoWriterValidator documentation to have more information about arguments constraints.
The partition_by argument is optional. You may pass just one column, or a list of columns, for multiple partitioning, but, the partition column(s) has to be the latest(s) of the schema. You can reorder the columns with a select.
If you want just to make a new partition sync in an existing Presto/Trino table, set the sync_mode parameter as ADD - to add the existing partitions on storage and non-existing on metastore- or DROP - to drop an existing partition on metastore and not more existing on source. It is important to emphasize that when the sync_mode parameter is not set, the default is FULL, that will run all command to create a new table and make a full partition sync.
To access the private environment from the public environment or access the public environment from the private environment, it is necessary to pass the storage_account_name and the catalog. Using the secret below. In this scenario, it is necessary to indicate container name in mount_name argument (PrestoConfig), instead of indicating the name of the workspace mount. E.g: mount_name: consumezoneprivate -> container name: consumezone
Private KV: This secret is used to identify the private storage account:
IrisPrestoConfigPrivateStorageAccountName
This secret is used to identify the private Catalog Name:
IrisPrestoConfigPrivateCatalogName
Public KV: This secret is used to identify the public storage account::
IrisPrestoConfigPublicStorageAccountName
This secret is used to identify the public Catalog Name:
IrisPrestoConfigPublicCatalogName
Example of the Public environment accessing the Private environment
presto_config = PrestoConfig(
format = 'parquet',
path = 'path',
country = 'Brazil',
mount_name = 'consumezone', # Container name instead of mount name
schema = schema,
table_name = table_name,
sync_mode = 'FULL',
storage_account_name=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPrivateStorageAccountName"),
catalog=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPrivateCatalogName")
)
Example of the Private environment accessing the Public environment
presto_config = PrestoConfig(
format = 'parquet',
path = 'path',
country = 'Brazil',
mount_name = 'consumezone', # Container name instead of mount name
schema = schema,
table_name = table_name,
sync_mode = 'FULL',
storage_account_name=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPublicStorageAccountName"),
catalog=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPublicCatalogName")
)
Data Warehouse or Data Mart writers¶
If you want to create or actualize a table in a Data Warehouse (Azure Data Warehouse) or a Data Mart (Azure Sql Server), you can use this feature. Example:
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.dw_config import DwWriterConfig
dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone", options={"maxStrLength": 4000})
dw_writer = SqlWriter(config=dw_config)
dw_writer.write(dataframe=extracted_dataset)
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.dm_config import DmWriterConfig
dm_config = DmWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", truncate="True", options={"maxStrLength": 4000})
dm_writer = SqlWriter(config=dm_config)
dm_writer.write(dataframe=extracted_dataset)
The argument options is optional. You can pass all accepted spark arguments in this field.
OBS: is necessary an existing schema in the data warehouse to create a table.
Please, consult pyiris.ingestion.validator.sql_writer_validator.SqlWriterValidator documentation to have more information about arguments constraints.
Writer example usage - brewdat¶
When reading and writing using brewdat as your mount_name at development environment, you will be connected to the Iris Blob. When at production environment, you will be connected to the Brewdat Blob.
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import FileWriter
file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'brewdat', mode = 'overwrite', partition_by="id")
file_writer = FileWriter(config = file_config)
file_writer.write(dataframe=extracted_dataset)
Load service¶
The class pyiris.ingestion.load.LoadService works as a service. With this you can execute some loads in sequence, as a scheduler. Follow the examples below: OBS: necessarily, the load tasks will be executed in this preference order: FileWriter, DwWriter or DmWriter and PrestoWriter.
from pyiris.ingestion.config.dw_config import DwWriterConfig
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import LoadService, FileWriter
dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone")
presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros')
third_blob_config = FileSystemConfig( format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'bifrost', mode = 'overwrite')
writers = [
FileWriter(config=third_blob_config),
PrestoWriter(config=presto_config),
SqlWriter(config=dw_config)
]
load_service = LoadService(writers=writers)
load_service.commit(dataframe=extracted_dataset)