Task

The task object is responsible for executing ExtractService, TransformService and LoadService.

Example code

Setup ExtractService, TransformService and LoadService objects

from pyiris.ingestion.extract import FileReader, ExtractService

from pyiris.ingestion.transform import TransformService, HashTransformation, SqlTransformation
from pyiris.ingestion.config.dw_config import DwWriterConfig
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.file_system_config import FileSystemConfig

from pyiris.ingestion.load import LoadService, FileWriter

readers = [
    FileReader(table_id='atos_inseguros', 
               mount_name='consumezone', 
               country='Brazil', 
               path='Seguranca/AtosInseguros', 
               format='parquet'),
    
    FileReader(table_id='condicao_insegura', 
               mount_name='consumezone', 
               country='Brazil', 
               path='Seguranca/CondicaoInsegura', 
               format='parquet')
    ]
query = """
    SELECT 
        * 
    FROM 
        atos_inseguros 
    INNER JOIN
        condicao_insegura
    ON 
        atos_inseguros.ID == condicao_insegura.ID
"""

extract_service = ExtractService(readers=readers, query=query)


transform_service = TransformService(
    transformations=[
        SqlTransformation(name='divide', 
                          description='Getting middle price', 
                          to_column="middle_price", 
                          sql_expression="price/quantity"),
        HashTransformation(name='Hash CPF', 
                           description='Hash CPF to be according to LGPD', 
                           from_columns=["seller_cpf"])
    ]

)

dw_config = DwWriterConfig(
    schema = "supply",
    table_name="segambev_atos_inseguros",
    mode="overwrite",
    temp_path="Brazil/Segambev/AtosInseguros",
    temp_container="consumezone"
    )
presto_config = PrestoConfig(
    format = 'parquet',
    path = 'Seguranca/AtosInseguros',
    country = 'Brazil',
    mount_name = 'consumezone',
    schema = 'segambev',
    table_name = 'atos_inseguros'
    )
third_blob_config = FileSystemConfig(
    format = 'parquet', 
    path = 'Seguranca/AtosInseguros', 
    country = 'Brazil', 
    mount_name = 'bifrost', 
    mode = 'overwrite'
    ) 

writers = [
    FileWriter(config=third_blob_config),
    PrestoWriter(config=presto_config),
    SqlWriter(config=dw_config)
]

load_service = LoadService(writers=writers)

Setup Task object and run using ExtractService, TransformService and LoadService

from pyiris.infrastructure import Spark

from pyiris.ingestion.task.task import Task

pyiris_spark = Spark()

task = Task(
    extract_service=extract_service,
    transform_service=transform_service,
    load_service=load_service,
)
task.run(spark=pyiris_spark)

Alternativelly you can setup a Task object and run using ExtractService and LoadService

from pyiris.infrastructure import Spark

from pyiris.ingestion.task.task import Task

pyiris_spark = Spark()

task = Task(
    extract_service=extract_service,
    load_service=load_service,
)
task.run(spark=pyiris_spark)