Описание DAG для генерации данных в Apache Airflow

Описание dataset_generation_dag

Текст скрипта
import pandas as pd
import numpy as np
import warnings
import random as r
from datetime import datetime, timedelta
import sqlalchemy
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task

warnings.filterwarnings('ignore')

all_services = ['Основное производство', 'Гражданское производство', 'Сервисное обслуживание', 'Прочие работы',
                'Собственнные нужды']
main_cit_service = all_services[:3]

dag = DAG(
    dag_id='dataset_generation_dag_tasking',
    schedule='@once',
    start_date=datetime(2018, 11, 1),
    description='for data generation using users parameters',
    params={
        'start_date': Param('2005-01-01 00:00:00', type='string',
                            description='start date in format 2005-01-01 00:00:00'),
        'end_date': Param('2024-03-01 23:59:59', type='string', description='end date in format 2024-03-01 23:59:59'),
        'table_name': Param('table_name', type='string', description='name of table to put generated strings'),
        'connection_id': Param('connnection_id', type='string', description='connection id from apache airflow'),
        'amount_rows': Param(1000, type='integer', description='name of table to put generated strings', minimum=1000),
        'num_instances': Param(1, type='integer', description='how many dags will work in parallel', minimum=1),
        'min_woo_number': Param(1000, type='integer', description='minimum woo_number', minimum=1),
        'max_woo_number': Param(10000, type='integer', description='maximum woo_number', minimum=1)
    },
    render_template_as_native_obj=True
)


def zero_probability_generation(prob):
    if r.random() > prob:
        return False
    else:
        return True


def date_generation(start_date, end_date):
    delta = end_date - start_date
    random_seconds = r.randint(0, int(delta.total_seconds()))
    random_date = start_date + timedelta(seconds=random_seconds)
    return random_date


def generate_all_dates(start_date, end_date, amount):
    erc_dates = []
    end_dates = []
    start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
    end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
    total_seconds = int((end_date - start_date).total_seconds())
    # random_seconds = np.random.choice(range(1, total_seconds), amount, replace=False) # Generates uniq values. Consuming a lot of resources.
    random_seconds = np.around(np.random.uniform(1, total_seconds, amount), 0)
    begin_dates = [start_date + timedelta(seconds=int(sec)) for sec in
                   random_seconds]
    for woo_begin_date in begin_dates:
        erc_date = date_generation(woo_begin_date, end_date).replace(day=r.choice([1, 25]), hour=0, minute=0, second=0,
                                                                     microsecond=0)
        if zero_probability_generation(0.0045) \
                and len(end_dates) != 0 \
                and end_dates[len(end_dates) - 1] is not None:
            woo_end_date = date_generation(woo_begin_date, end_date)
        else:
            woo_end_date = None
        erc_dates.append(erc_date)
        end_dates.append(woo_end_date)
    print('Date generation is complete')
    return begin_dates, erc_dates, end_dates


def columns_generation(woo_number, start_date, end_date, size):
    woo_number = [[woo_number] * size][0]
    woo_begin, erc_date, woo_end = generate_all_dates(start_date, end_date, size)
    woo_state_title = ['закрыт' if i is not None else None for i in woo_end]
    woosh_overhead_pct_woprt_title = [[r.choice(all_services)] * size][0]
    erc_workshop = np.random.randint(1, 90, size)
    if woosh_overhead_pct_woprt_title[0] in main_cit_service:
        erc_cable_sum_deb = [i if ws in range(1, 31) and zero_probability_generation(0.9) else 0 for ws, i in
                             zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
        erc_cable_sum_cred = [i if ws in range(1, 31) and zero_probability_generation(0.9) else 0 for ws, i in
                              zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
        erc_semi_product_sum_deb = [i if zero_probability_generation(0.5) else 0 for i in
                                    np.around(np.random.exponential(500000000.0, size))]
        erc_semi_product_sum_cred = [i if zero_probability_generation(0.5) else 0 for i in
                                     np.around(np.random.exponential(500000000.0, size), 1)]
        erc_accessory_mat_sum_deb = [i if zero_probability_generation(0.9) else 0 for i in
                                     np.around(np.random.exponential(500000000.0, size), 1)]
        erc_accessory_mat_sum_cred = [i if zero_probability_generation(0.9) else 0 for i in
                                      np.around(np.random.exponential(500000000.0, size), 1)]
        erc_contractor_service_deb = [i if ws in range(1, 11) and zero_probability_generation(0.1) else 0 for
                                      ws, i in
                                      zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
        erc_contractor_service_cred = [i if ws in range(1, 11) and zero_probability_generation(0.1) else 0 for
                                       ws, i in
                                       zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
    else:
        erc_cable_sum_deb = [[0] * size][0]
        erc_cable_sum_cred = [[0] * size][0]
        erc_semi_product_sum_deb = [[0] * size][0]
        erc_semi_product_sum_cred = [[0] * size][0]
        erc_accessory_mat_sum_deb = [[0] * size][0]
        erc_accessory_mat_sum_cred = [[0] * size][0]
        erc_contractor_service_deb = [[0] * size][0]
        erc_contractor_service_cred = [[0] * size][0]
    erc_recycle_waste_deb = [i if zero_probability_generation(0.05) else 0 for i in
                             np.around(np.random.uniform(0.00, -100000.00, size), 1)]
    erc_recycle_waste_cred = [i if zero_probability_generation(0.05) else 0 for i in
                              np.around(np.random.uniform(0.00, -100000.00, size), 1)]
    erc_labor_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_labor_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_main_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_main_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_addit_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_addit_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_uni_social_tax_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_uni_social_tax_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_amort_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_amort_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_others_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_production_others_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_household_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_household_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_household_others_deb = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_household_others_cred = np.around(np.random.uniform(0, 999999999, size), 1)
    erc_special_deb = [i if zero_probability_generation(0.1) else 0 for i in
                       np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_special_cred = [i if zero_probability_generation(0.1) else 0 for i in
                        np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_others_deb = [i if zero_probability_generation(0.1) else 0 for i in
                      np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_others_cred = [i if zero_probability_generation(0.1) else 0 for i in
                       np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_semi_fineshed_deb = [i if zero_probability_generation(0.5) else 0 for i in
                             np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_semi_fineshed_cred = [i if zero_probability_generation(0.5) else 0 for i in
                              np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_components_deb = [i if zero_probability_generation(0.1) else 0 for i in
                          np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_components_cred = [i if zero_probability_generation(0.1) else 0 for i in
                           np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_container_deb = [i if zero_probability_generation(0.1) else 0 for i in
                         np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_container_cred = [i if zero_probability_generation(0.1) else 0 for i in
                          np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_spec_equipment_deb = [i if zero_probability_generation(0.1) else 0 for i in
                              np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_spec_equipment_cred = [i if zero_probability_generation(0.1) else 0 for i in
                               np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_day_deb = [i if zero_probability_generation(0.05) else 0 for i in
                          np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_day_cred = [i if zero_probability_generation(0.05) else 0 for i in
                           np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_liveing_deb = [i if zero_probability_generation(0.05) else 0 for i in
                              np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_liveing_cred = [i if zero_probability_generation(0.05) else 0 for i in
                               np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_transp_deb = [i if zero_probability_generation(0.05) else 0 for i in
                             np.around(np.random.uniform(0, 999999999, size), 1)]
    erc_travel_transp_cred = [i if zero_probability_generation(0.05) else 0 for i in
                              np.around(np.random.uniform(0, 999999999, size), 1)]
    data = {
        'woo_number': woo_number,
        'woo_begin': woo_begin,
        'woo_end': woo_end,
        'woo_state_title': woo_state_title,
        'woosh_overhead_pct_woprt_title': woosh_overhead_pct_woprt_title,
        'erc_date': erc_date,
        'erc_workshop': erc_workshop,
        'erc_cable_sum_deb': erc_cable_sum_deb,
        'erc_cable_sum_cred': erc_cable_sum_cred,
        'erc_semi_product_sum_deb': erc_semi_product_sum_deb,
        'erc_semi_product_sum_cred': erc_semi_product_sum_cred,
        'erc_accessory_mat_sum_deb': erc_accessory_mat_sum_deb,
        'erc_accessory_mat_sum_cred': erc_accessory_mat_sum_cred,
        'erc_recycle_waste_deb': erc_recycle_waste_deb,
        'erc_recycle_waste_cred': erc_recycle_waste_cred,
        'erc_labor_deb': erc_labor_deb,
        'erc_labor_cred': erc_labor_cred,
        'erc_main_salary_deb': erc_main_salary_deb,
        'erc_main_salary_cred': erc_main_salary_cred,
        'erc_addit_salary_deb': erc_addit_salary_deb,
        'erc_addit_salary_cred': erc_addit_salary_cred,
        'erc_uni_social_tax_deb': erc_uni_social_tax_deb,
        'erc_uni_social_tax_cred': erc_uni_social_tax_cred,
        'erc_production_salary_deb': erc_production_salary_deb,
        'erc_production_salary_cred': erc_production_salary_cred,
        'erc_production_amort_deb': erc_production_amort_deb,
        'erc_production_amort_cred': erc_production_amort_cred,
        'erc_production_others_deb': erc_production_others_deb,
        'erc_production_others_cred': erc_production_others_cred,
        'erc_household_salary_deb': erc_household_salary_deb,
        'erc_household_salary_cred': erc_household_salary_cred,
        'erc_household_others_deb': erc_household_others_deb,
        'erc_household_others_cred': erc_household_others_cred,
        'erc_contractor_service_deb': erc_contractor_service_deb,
        'erc_contractor_service_cred': erc_contractor_service_cred,
        'erc_special_deb': erc_special_deb,
        'erc_special_cred': erc_special_cred,
        'erc_others_deb': erc_others_deb,
        'erc_others_cred': erc_others_cred,
        'erc_semi_fineshed_deb': erc_semi_fineshed_deb,
        'erc_semi_fineshed_cred': erc_semi_fineshed_cred,
        'erc_components_deb': erc_components_deb,
        'erc_components_cred': erc_components_cred,
        'erc_container_deb': erc_container_deb,
        'erc_container_cred': erc_container_cred,
        'erc_spec_equipment_deb': erc_spec_equipment_deb,
        'erc_spec_equipment_cred': erc_spec_equipment_cred,
        'erc_travel_day_deb': erc_travel_day_deb,
        'erc_travel_day_cred': erc_travel_day_cred,
        'erc_travel_liveing_deb': erc_travel_liveing_deb,
        'erc_travel_liveing_cred': erc_travel_liveing_cred,
        'erc_travel_transp_deb': erc_travel_transp_deb,
        'erc_travel_transp_cred': erc_travel_transp_cred}
    return data


def pandas_workflow(woo_number, start_date, end_date, size):
    data = columns_generation(woo_number, start_date, end_date, size)
    df = pd.DataFrame(data)
    df = df.sort_values(by=['woo_begin', 'erc_date']).reset_index(drop=True)
    df['erc_cable_transp_deb'] = df['erc_cable_sum_deb'] * 0.01

    df['erc_cable_transp_cred'] = df['erc_cable_sum_cred'] * 0.01

    df['erc_semi_product_transp_deb'] = df['erc_semi_product_sum_deb'] * 0.01

    df['erc_semi_product_transp_cred'] = df['erc_semi_product_sum_cred'] * 0.01

    df['erc_accessory_mat_transp_deb'] = df['erc_accessory_mat_sum_deb'] * 0.01

    df['erc_accessory_mat_transp_cred'] = df['erc_accessory_mat_sum_cred'] * 0.01

    df['erc_semi_fineshed_transp_deb'] = df['erc_semi_fineshed_deb'] * 0.01

    df['erc_semi_fineshed_transp_cred'] = df['erc_semi_fineshed_cred'] * 0.01

    df['erc_total_deb'] = df['erc_cable_sum_deb'] + df['erc_cable_transp_deb'] + df['erc_semi_product_sum_deb'] + df[
        'erc_semi_product_transp_deb'] + df['erc_accessory_mat_sum_deb'] + df['erc_accessory_mat_transp_deb'] + df[
                              'erc_recycle_waste_deb'] + df['erc_main_salary_deb'] + df['erc_addit_salary_deb'] + df[
                              'erc_uni_social_tax_deb'] + df['erc_production_salary_deb'] + df[
                              'erc_production_amort_deb'] + df['erc_production_others_deb'] + df[
                              'erc_household_salary_deb'] + df['erc_household_others_deb'] + df[
                              'erc_contractor_service_deb'] + df['erc_special_deb'] + df['erc_others_deb'] + df[
                              'erc_semi_fineshed_deb'] + df['erc_semi_fineshed_transp_deb'] + df['erc_components_deb'] + \
                          df['erc_container_deb'] + df['erc_spec_equipment_deb'] + df['erc_travel_day_deb'] + df[
                              'erc_travel_liveing_deb'] + df['erc_travel_transp_deb']
    df['erc_total_cred'] = df['erc_cable_sum_deb'] + df['erc_cable_transp_deb'] + df['erc_semi_product_sum_deb'] + df[
        'erc_semi_product_transp_deb'] + df['erc_accessory_mat_sum_deb'] + df['erc_accessory_mat_transp_deb'] + df[
                               'erc_recycle_waste_deb'] + df['erc_main_salary_deb'] + df['erc_addit_salary_deb'] + df[
                               'erc_uni_social_tax_deb'] + df['erc_production_salary_deb'] + df[
                               'erc_production_amort_deb'] + df['erc_production_others_deb'] + df[
                               'erc_household_salary_deb'] + df['erc_household_others_deb'] + df[
                               'erc_contractor_service_deb'] + df['erc_special_deb'] + df['erc_others_deb'] + df[
                               'erc_semi_fineshed_deb'] + df['erc_semi_fineshed_transp_deb'] + df[
                               'erc_components_deb'] + df['erc_container_deb'] + df['erc_spec_equipment_deb'] + df[
                               'erc_travel_day_deb'] + df['erc_travel_liveing_deb'] + df['erc_travel_transp_cred']

    columns = df.columns.to_list()
    deb_columns = [col for col in columns if col.endswith('_deb')]
    cred_columns = [col for col in columns if col.endswith('_cred')]
    for deb_column, cred_column in zip(deb_columns, cred_columns):
        if cred_column.endswith('_cred'):
            bal_col = cred_column.replace('_cred', '_bal')
            df[bal_col] = 0
            for i in range(len(df)):
                if i == 0 or df.loc[i - 1, 'woo_state_title'] == 'закрыт':
                    df.loc[i, bal_col] = df.loc[i, deb_column] - df.loc[i, cred_column]
                else:
                    if df.loc[i, 'woo_state_title'] == 'закрыт':
                        df.loc[i, bal_col] = 0
                        df.loc[i, deb_column] = df.loc[i, cred_column] - df.loc[i - 1, bal_col]
                    else:
                        df.loc[i, bal_col] = df.loc[i - 1, bal_col] + df.loc[i, deb_column] - df.loc[i, cred_column]

    column_order = []
    for col in columns:
        if col.endswith('_cred'):
            column_order.append(col)
            column_order.append(col.replace('_cred', '_bal'))
        else:
            column_order.append(col)
    df = df.round(1)
    # df[column_order].to_csv('df.csv', encoding='cp1251', index=False)
    print(f'batch with {size} rows is ready')
    return df[column_order]


@task(dag=dag)
def dataframe_to_ch(arg_list):
    woo_number_start, woo_number_end, start_date, end_date, size, table_name, connection_id = arg_list
    client = PostgresHook(postgres_conn_id=connection_id)
    start_time = datetime.now()
    print(f'batch generation started at : {start_time}')
    for woo_number in range(woo_number_start, woo_number_end):
        df = pandas_workflow(woo_number, start_date, end_date, size)
        client.insert_rows(table=table_name, rows=df.values.tolist())
        end_time = datetime.now()
        print(f'Batch with woo_numbers {woo_number} was generated. Time has passed: {end_time - start_time}')


@task(dag=dag)
def running_multiple_task(**kwargs):
    task_list = []
    start_date = kwargs['params'].get('start_date')
    end_date = kwargs['params'].get('end_date')
    table_name = kwargs['params'].get('table_name')
    connection_id = kwargs['params'].get('connection_id')
    amount_rows = kwargs['params'].get('amount_rows')
    num_instances = kwargs['params'].get('num_instances')
    min_woo_number = kwargs['params'].get('min_woo_number')
    max_woo_number = kwargs['params'].get('max_woo_number')
    num_per_process = (max_woo_number - min_woo_number) // num_instances
    rows_per_woo_number = amount_rows // (max_woo_number - min_woo_number)
    for i in range(num_instances):
        min_instance_woo_number = min_woo_number + i * num_per_process
        max_instance_woo_number = min_woo_number + (i + 1) * num_per_process
        task_list.append(
            [min_instance_woo_number, max_instance_woo_number, start_date, end_date, rows_per_woo_number, table_name,
             connection_id])
    return task_list


dataframe_to_ch.partial().expand(arg_list=running_multiple_task())

Для запуска необходимо нажать на кнопку с треугольником и заполнить параметры.

Параметры

После запуска необходимо заполнить поля с параметрами, исходя из которых будут сгенерированы данные. У каждого параметра есть описание и заполненное значение по умолчанию. Подробнее о параметрах:

  • start_date - начиная с какой даты генерировать данные
  • end_date - по какую дату генерировать значения
  • table_name - название таблицы, куда сложить данные
  • connection_id - id подключения в Apache Airflow
  • amount_rows - общее количество строк которое необходимо сгенерировать
  • num_instances - на сколько инстансов разбить задачу
  • min_woo_number - начиная с какого номера генерировать значения
  • max_woo_number - по какой номер генерировать значения

Исходя из полученных значений рассчитывается сколько строк необходимо на каждый woo_number, список всех woo_number, а также сколько строк должен сгенерировать каждый таск (num_instances).

Функции

В скрипте функции принимают на себя полученные из переменных аргументы, и исходя из этого рассчитывают показатели для каждой колонки.

zero_probability_generation

Вероятность получения 0

Принимает на себя в качестве аргумента число от 0 до 1 (вероятность). Генерирует случайное число в этом диапазоне, и сравнивает с принятым числом, и исходя из результата возвращает False или True.

date_generation

Генератор даты

Принимает на себя в качестве аргумента 2 даты в формате datetime, и генерирует случайную дату между ними. Эту случайную дату и возвращает.

generate_all_dates

Генератор всех дат для итоговой таблицы.

Принимает на себя в качестве аргумента 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.

Данная функция использует функцию date_generation для генерации дат. Также учитывает специфику проекта по датам.

Возвращает 3 списка сгенерированных дат.

columns_generation

Генератор колонок

Принимает на себя в качестве аргумента номер для колонки woo_number, 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.

Для получения колонок дат обращается к функции generate_all_dates, генерирует остальные колонки внутри себя.

Каждая колонка - список значений.

Затем преобразует все в словарь, где каждому ключу соответствует ранее сгенерированный список значений. Название каждого ключа должно совпадать с наименованием колонок заранее созданной таблицы.

Возвращает полученный словарь.

pandas_workflow

Рабочее пространство пандаса.

Принимает на себя в качестве аргумента номер для колонки woo_number, 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.

Получает словарь из функции columns_generation. Преобразовывает данный словарь в формат пандаса, производит вычисления согласно специфики проекта.

Формирует список из названия колонок в нужном порядке. Возвращает датафрейм с колонками в нужном порядке.

dataframe_to_ch

Датафрейм в БД

Принимает на себя в качестве аргумента список из значений полученных при заполнении параметров. Формирует датафреймы с помощью функции pandas_workflow, и записывает их в БД.

running_multiple_task

Запуск нескольких заданий

Принимает на себя произвольное число именованных аргументов, полученных из Airflow. Исходя из полученных аргументов вычисляет количество и диапазон номеров на каждый инстанс. Полученные значения складывает в список для дальнейшего запуска.

2 Likes