Data Extract

The extract module is responsible for getting the data from the source. In this module, basically, you can find the readers classes and its correlated.

File Reader

The file reader class is responsible for reading the files from the source and return a Spark dataframe. There are two ways to read:

One dataset

If you want to read just one dataset, you have to follow this example:

from pyiris.ingestion.extract import FileReader
from pyiris.infrastructure import Spark

pyiris_spark = Spark()

file_reader_config = FileReader(table_id='atos_inseguros', mount_name='consumezone', country='Brazil', path='Seguranca/AtosInseguros', format='parquet')
dataset = file_reader_config.consume(spark=pyiris_spark)

You can also choose to read a dataset with a filter, simply by adding it to the consume method, such as:

from pyiris.ingestion.extract import FileReader
from pyiris.infrastructure import Spark

pyiris_spark = Spark()

file_reader = FileReader(table_id="test_dataframe", mount_name="consumezone", country="Brazil", path="Seguranca/AtosInseguros", format="parquet")
filtered_dataframe = file_reader.consume(spark=pyiris_spark, filter="age == 30")

filtered_dataframe.show()

Reader example usage - brewdat

When reading and writing using brewdat as your mount_name at development environment, you will be connected to the Iris Blob (analyticsplatformblob), using the container “brewdat”. When at production environment, you will be connected to the Brewdat Blob (brewdatblobextsagbdev), using the container “iris-plz”.

from pyiris.ingestion.extract import FileReader
from pyiris.infrastructure import Spark

pyiris_spark = Spark()

file_reader_config = FileReader(table_id='atos_inseguros', mount_name='brewdat', country='Brazil', path='Seguranca/AtosInseguros', format='parquet')
dataset = file_reader_config.consume(spark=pyiris_spark)

More than one dataset

If you want to read more than one dataset, you have to use the class pyiris.ingestion.extract.extract_service.ExtractService. This class works as a service. Follow this example:

from pyiris.ingestion.extract import FileReader, ExtractService
from pyiris.infrastructure import Spark

pyiris_spark = Spark()

readers = [
    FileReader(table_id='atos_inseguros', mount_name='consumezone', country='Brazil', path='Seguranca/AtosInseguros', format='parquet'),
    
    FileReader(table_id='condicao_insegura', mount_name='consumezone', country='Brazil', path='Seguranca/CondicaoInsegura', format='parquet')
    ]
query = """
    SELECT 
        * 
    FROM 
        atos_inseguros 
    INNER JOIN
        condicao_insegura
    ON 
        atos_inseguros.ID == condicao_insegura.ID
"""

extract_service = ExtractService(readers=readers, query=query)
dataset = extract_service.handler(spark=pyiris_spark)

To have more information, please, access the code docstring in Pyiris modules.