# Data Load The load module is responsible for loading data in the storage and access points of IRIS Platform. You can write data on our Azure Data Lake Storage gen 2, Blob Storages (like Bifrost...), also on our Presto/Trino external table and Data Warehouses or Data Marts. ### File writer If you have to write a dataframe in a file system you may use our class **pyiris.ingestion.load.FileWriter**, access the modules to have more information. Use example: ~~~Python from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import FileWriter file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', mode = 'overwrite', partition_by="id") file_writer = FileWriter(config = file_config) file_writer.write(dataframe=extracted_dataset) ~~~ The **mount_name** argument set the storage where the data will be loaded. Please, consult module **pyiris.ingestion.validator.file_writer_validator.FileWriterValidator** documentation to have more information about arguments constraints. The **partition_by** argument is optional. You may pass just one column, or a list of columns, for multiple partitioning. ### Presto writer This feature is responsible for creating or updating an external table in our Presto/Trino. Example: ~~~Python from pyiris.ingestion.load.writers.presto_writer import PrestoWriter from pyiris.ingestion.config.presto_config import PrestoConfig presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros', partition_by="id") presto_writer = PrestoWriter(config=presto_config) presto_writer.write(dataframe=extracted_dataset) ~~~ Please, consult module **pyiris.ingestion.validator.presto_writer_validator.PrestoWriterValidator** documentation to have more information about arguments constraints. The **partition_by** argument is optional. You may pass just one column, or a list of columns, for multiple partitioning, but, the partition column(s) has to be the latest(s) of the schema. You can reorder the columns with a select. If you want just to make a new partition sync in an existing Presto/Trino table, set the `sync_mode` parameter as `ADD` - to add the existing partitions on storage and non-existing on metastore- or `DROP` - to drop an existing partition on metastore and not more existing on source. It is important to emphasize that when the `sync_mode` parameter is not set, the default is `FULL`, that will run all command to create a new table and make a full partition sync. It is important to emphasize that when the `sync_mode` parameter is not set, the default is `FULL`, that will run all command to create a new table and make a full partition sync. ### Data Warehouse or Data Mart writers If you want to create or actualize a table in a Data Warehouse (Azure Data Warehouse) or a Data Mart (Azure Sql Server), you can use this feature. Example: ~~~Python from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.dw_config import DwWriterConfig dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone", options={"maxStrLength": 4000}) dw_writer = SqlWriter(config=dw_config) dw_writer.write(dataframe=extracted_dataset) ~~~ ~~~Python from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.dm_config import DmWriterConfig dm_config = DmWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", truncate="True", options={"maxStrLength": 4000}) dm_writer = SqlWriter(config=dm_config) dm_writer.write(dataframe=extracted_dataset) ~~~ The argument **options** is optional. You can pass all accepted spark arguments in this field. OBS: is necessary an existing schema in the data warehouse to create a table. Please, consult **pyiris.ingestion.validator.sql_writer_validator.SqlWriterValidator** documentation to have more information about arguments constraints. ### Writer example usage - brewdat When reading and writing using brewdat as your mount_name at development environment, you will be connected to the Iris Blob. When at production environment, you will be connected to the Brewdat Blob. ~~~Python from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import FileWriter file_config = FileSystemConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'brewdat', mode = 'overwrite', partition_by="id") file_writer = FileWriter(config = file_config) file_writer.write(dataframe=extracted_dataset) ~~~ ## Load service The class pyiris.ingestion.load.LoadService works as a service. With this you can execute some loads in sequence, as a scheduler. Follow the examples below: OBS: necessarily, the load tasks will be executed in this preference order: FileWriter, DwWriter or DmWriter and PrestoWriter. ~~~Python from pyiris.ingestion.config.dw_config import DwWriterConfig from pyiris.ingestion.load.writers.presto_writer import PrestoWriter from pyiris.ingestion.config.presto_config import PrestoConfig from pyiris.ingestion.load.writers.sql_writer import SqlWriter from pyiris.ingestion.config.file_system_config import FileSystemConfig from pyiris.ingestion.load import LoadService, FileWriter dw_config = DwWriterConfig(schema = "supply", table_name="segambev_atos_inseguros", mode="overwrite", temp_path="Brazil/Segambev/AtosInseguros", temp_container="consumezone") presto_config = PrestoConfig(format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'consumezone', schema = 'segambev', table_name = 'atos_inseguros') third_blob_config = FileSystemConfig( format = 'parquet', path = 'Seguranca/AtosInseguros', country = 'Brazil', mount_name = 'bifrost', mode = 'overwrite') writers = [ FileWriter(config=third_blob_config), PrestoWriter(config=presto_config), SqlWriter(config=dw_config) ] load_service = LoadService(writers=writers) load_service.commit(dataframe=extracted_dataset) ~~~