Task¶
The task object is responsible for executing ExtractService, TransformService and LoadService.
Example code¶
Setup ExtractService, TransformService and LoadService objects
from pyiris.ingestion.extract import FileReader, ExtractService
from pyiris.ingestion.transform import TransformService, HashTransformation, SqlTransformation
from pyiris.ingestion.config.dw_config import DwWriterConfig
from pyiris.ingestion.load.writers.presto_writer import PrestoWriter
from pyiris.ingestion.config.presto_config import PrestoConfig
from pyiris.ingestion.load.writers.sql_writer import SqlWriter
from pyiris.ingestion.config.file_system_config import FileSystemConfig
from pyiris.ingestion.load import LoadService, FileWriter
readers = [
FileReader(table_id='atos_inseguros',
mount_name='consumezone',
country='Brazil',
path='Seguranca/AtosInseguros',
format='parquet'),
FileReader(table_id='condicao_insegura',
mount_name='consumezone',
country='Brazil',
path='Seguranca/CondicaoInsegura',
format='parquet')
]
query = """
SELECT
*
FROM
atos_inseguros
INNER JOIN
condicao_insegura
ON
atos_inseguros.ID == condicao_insegura.ID
"""
extract_service = ExtractService(readers=readers, query=query)
transform_service = TransformService(
transformations=[
SqlTransformation(name='divide',
description='Getting middle price',
to_column="middle_price",
sql_expression="price/quantity"),
HashTransformation(name='Hash CPF',
description='Hash CPF to be according to LGPD',
from_columns=["seller_cpf"])
]
)
dw_config = DwWriterConfig(
schema = "supply",
table_name="segambev_atos_inseguros",
mode="overwrite",
temp_path="Brazil/Segambev/AtosInseguros",
temp_container="consumezone"
)
presto_config = PrestoConfig(
format = 'parquet',
path = 'Seguranca/AtosInseguros',
country = 'Brazil',
mount_name = 'consumezone',
schema = 'segambev',
table_name = 'atos_inseguros'
)
third_blob_config = FileSystemConfig(
format = 'parquet',
path = 'Seguranca/AtosInseguros',
country = 'Brazil',
mount_name = 'bifrost',
mode = 'overwrite'
)
writers = [
FileWriter(config=third_blob_config),
PrestoWriter(config=presto_config),
SqlWriter(config=dw_config)
]
load_service = LoadService(writers=writers)
Setup Task object and run using ExtractService, TransformService and LoadService
from pyiris.infrastructure import Spark
from pyiris.ingestion.task.task import Task
pyiris_spark = Spark()
task = Task(
extract_service=extract_service,
transform_service=transform_service,
load_service=load_service,
)
task.run(spark=pyiris_spark)
Alternativelly you can setup a Task object and run using ExtractService and LoadService
from pyiris.infrastructure import Spark
from pyiris.ingestion.task.task import Task
pyiris_spark = Spark()
task = Task(
extract_service=extract_service,
load_service=load_service,
)
task.run(spark=pyiris_spark)