Data Load¶
The load module is responsible for loading data in the storage and access points of IRIS Platform. You can write data on our Azure Data Lake Storage gen 2, Blob Storages (like Bifrost…), also on our Presto/Trino external table and Data Warehouses or Data Marts.
File writer¶
If you have to write a dataframe in a file system you may use our class pyiris.ingestion.load.FileWriter, access the modules to have more information. Use example:
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import FileWriter
file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', mode = 'overwrite', partition_by="id")
file_writer = FileWriter(config = file_config)
file_writer.write(dataframe=extracted_dataset)
The mount_name argument set the storage where the data will be loaded. Please, consult module pyiris.ingestion.validator.file_writer_validator.FileWriterValidator documentation to have more information about arguments constraints.
The partition_by argument is optional. You may pass just one column, or a list of columns, for multiple partitioning.
Presto writer¶
This feature is responsible for creating or updating an external table in our Presto/Trino. Example:
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros', partition_by="id")
presto_writer = PrestoWriter(config=presto_config)
presto_writer.write(dataframe=extracted_dataset)
Please, consult module pyiris.ingestion.validator.presto_writer_validator.PrestoWriterValidator documentation to have more information about arguments constraints.
The partition_by argument is optional. You may pass just one column, or a list of columns, for multiple partitioning, but, the partition column(s) has to be the latest(s) of the schema. You can reorder the columns with a select.
If you want just to make a new partition sync in an existing Presto/Trino table, set the sync_mode parameter as ADD - to add the existing partitions on storage and non-existing on metastore- or DROP - to drop an existing partition on metastore and not more existing on source. It is important to emphasize that when the sync_mode parameter is not set, the default is FULL, that will run all command to create a new table and make a full partition sync.
Data Warehouse or Data Mart writers¶
If you want to create or actualize a table in a Data Warehouse (Azure Data Warehouse) or a Data Mart (Azure Sql Server), you can use this feature. Example:
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.dw_config import DwWriterConfig
dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone", options={"maxStrLength": 4000})
dw_writer = SqlWriter(config=dw_config)
dw_writer.write(dataframe=extracted_dataset)
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.dm_config import DmWriterConfig
dm_config = DmWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", truncate="True", options={"maxStrLength": 4000})
dm_writer = SqlWriter(config=dm_config)
dm_writer.write(dataframe=extracted_dataset)
The argument options is optional. You can pass all accepted spark arguments in this field.
OBS: is necessary an existing schema in the data warehouse to create a table.
Please, consult pyiris.ingestion.validator.sql_writer_validator.SqlWriterValidator documentation to have more information about arguments constraints.
Writer example usage - brewdat¶
When reading and writing using brewdat as your mount_name at development environment, you will be connected to the Iris Blob. When at production environment, you will be connected to the Brewdat Blob.
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import FileWriter
file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'brewdat', mode = 'overwrite', partition_by="id")
file_writer = FileWriter(config = file_config)
file_writer.write(dataframe=extracted_dataset)
Load service¶
The class pyiris.ingestion.load.LoadService works as a service. With this you can execute some loads in sequence, as a scheduler. Follow the examples below: OBS: necessarily, the load tasks will be executed in this preference order: FileWriter, DwWriter or DmWriter and PrestoWriter.
from pyiris.ingestion.config.dw_config import DwWriterConfig
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import LoadService, FileWriter
dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone")
presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros')
third_blob_config = FileSystemConfig( format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'bifrost', mode = 'overwrite')
writers = [
FileWriter(config=third_blob_config),
PrestoWriter(config=presto_config),
SqlWriter(config=dw_config)
]
load_service = LoadService(writers=writers)
load_service.commit(dataframe=extracted_dataset)