# Data Load The load module is responsible for loading data in the storage and access points of IRIS Platform. You can write data on our Azure Data Lake Storage gen 2, Blob Storages (like Bifrost...), also on our Presto/Trino external table and Data Warehouses or Data Marts. ### File writer If you have to write a dataframe in a file system you may use our class **pyiris.ingestion.load.FileWriter**, access the modules to have more information. Use example: ~~~Python from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import FileWriter file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', mode = 'overwrite', partition_by="id") file_writer = FileWriter(config = file_config) file_writer.write(dataframe=extracted_dataset) ~~~ The **mount_name** argument set the storage where the data will be loaded. Please, consult module **pyiris.ingestion.validator.file_writer_validator.FileWriterValidator** documentation to have more information about arguments constraints. The **partition_by** argument is optional. You may pass just one column, or a list of columns, for multiple partitioning. ### Presto writer This feature is responsible for creating or updating an external table in our Presto/Trino. Example: ~~~Python from pyiris.ingestion.load.writers.presto_writer import PrestoWriter from pyiris.ingestion.config.presto_config import PrestoConfig presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros', partition_by="id") presto_writer = PrestoWriter(config=presto_config) presto_writer.write(dataframe=extracted_dataset) ~~~ Please, consult module **pyiris.ingestion.validator.presto_writer_validator.PrestoWriterValidator** documentation to have more information about arguments constraints. The **partition_by** argument is optional. You may pass just one column, or a list of columns, for multiple partitioning, but, the partition column(s) has to be the latest(s) of the schema. You can reorder the columns with a select. If you want just to make a new partition sync in an existing Presto/Trino table, set the `sync_mode` parameter as `ADD` - to add the existing partitions on storage and non-existing on metastore- or `DROP` - to drop an existing partition on metastore and not more existing on source. It is important to emphasize that when the `sync_mode` parameter is not set, the default is `FULL`, that will run all command to create a new table and make a full partition sync. To access the private environment from the public environment or access the public environment from the private environment, it is necessary to pass the storage_account_name and the catalog. Using the secret below. In this scenario, it is necessary to indicate container name in mount_name argument (PrestoConfig), instead of indicating the name of the workspace mount. E.g: mount_name: consumezoneprivate -> container name: consumezone Private KV: This secret is used to identify the private storage account: ``` IrisPrestoConfigPrivateStorageAccountName ``` This secret is used to identify the private Catalog Name: ``` IrisPrestoConfigPrivateCatalogName ``` Public KV: This secret is used to identify the public storage account:: ``` IrisPrestoConfigPublicStorageAccountName ``` This secret is used to identify the public Catalog Name: ``` IrisPrestoConfigPublicCatalogName ``` Example of the Public environment accessing the Private environment ~~~~Python presto_config = PrestoConfig( format = 'parquet', path = 'path', country = 'Brazil', mount_name = 'consumezone', # Container name instead of mount name schema = schema, table_name = table_name, sync_mode = 'FULL', storage_account_name=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPrivateStorageAccountName"), catalog=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPrivateCatalogName") ) ~~~~ Example of the Private environment accessing the Public environment ~~~~Python presto_config = PrestoConfig( format = 'parquet', path = 'path', country = 'Brazil', mount_name = 'consumezone', # Container name instead of mount name schema = schema, table_name = table_name, sync_mode = 'FULL', storage_account_name=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPublicStorageAccountName"), catalog=dbutils.secrets.get(scope="keyvault", key="IrisPrestoConfigPublicCatalogName") ) ~~~~ ### Data Warehouse or Data Mart writers If you want to create or actualize a table in a Data Warehouse (Azure Data Warehouse) or a Data Mart (Azure Sql Server), you can use this feature. Example: ~~~Python from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.dw_config import DwWriterConfig dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone", options={"maxStrLength": 4000}) dw_writer = SqlWriter(config=dw_config) dw_writer.write(dataframe=extracted_dataset) ~~~ ~~~Python from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.dm_config import DmWriterConfig dm_config = DmWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", truncate="True", options={"maxStrLength": 4000}) dm_writer = SqlWriter(config=dm_config) dm_writer.write(dataframe=extracted_dataset) ~~~ The argument **options** is optional. You can pass all accepted spark arguments in this field. OBS: is necessary an existing schema in the data warehouse to create a table. Please, consult **pyiris.ingestion.validator.sql_writer_validator.SqlWriterValidator** documentation to have more information about arguments constraints. ### Writer example usage - brewdat When reading and writing using brewdat as your mount_name at development environment, you will be connected to the Iris Blob. When at production environment, you will be connected to the Brewdat Blob. ~~~Python from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import FileWriter file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'brewdat', mode = 'overwrite', partition_by="id") file_writer = FileWriter(config = file_config) file_writer.write(dataframe=extracted_dataset) ~~~ ## Load service The class pyiris.ingestion.load.LoadService works as a service. With this you can execute some loads in sequence, as a scheduler. Follow the examples below: OBS: necessarily, the load tasks will be executed in this preference order: FileWriter, DwWriter or DmWriter and PrestoWriter. ~~~Python from pyiris.ingestion.config.dw_config import DwWriterConfig from pyiris.ingestion.load.writers.presto_writer import PrestoWriter from pyiris.ingestion.config.presto_config import PrestoConfig from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import LoadService, FileWriter dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone") presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros') third_blob_config = FileSystemConfig( format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'bifrost', mode = 'overwrite') writers = [ FileWriter(config=third_blob_config), PrestoWriter(config=presto_config), SqlWriter(config=dw_config) ] load_service = LoadService(writers=writers) load_service.commit(dataframe=extracted_dataset) ~~~