Описание dataset_generation_dag
Текст скрипта
import pandas as pd
import numpy as np
import warnings
import random as r
from datetime import datetime, timedelta
import sqlalchemy
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow import DAG
from airflow.models import Variable
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
warnings.filterwarnings('ignore')
all_services = ['Основное производство', 'Гражданское производство', 'Сервисное обслуживание', 'Прочие работы',
'Собственнные нужды']
main_cit_service = all_services[:3]
dag = DAG(
dag_id='dataset_generation_dag_tasking',
schedule='@once',
start_date=datetime(2018, 11, 1),
description='for data generation using users parameters',
params={
'start_date': Param('2005-01-01 00:00:00', type='string',
description='start date in format 2005-01-01 00:00:00'),
'end_date': Param('2024-03-01 23:59:59', type='string', description='end date in format 2024-03-01 23:59:59'),
'table_name': Param('table_name', type='string', description='name of table to put generated strings'),
'connection_id': Param('connnection_id', type='string', description='connection id from apache airflow'),
'amount_rows': Param(1000, type='integer', description='name of table to put generated strings', minimum=1000),
'num_instances': Param(1, type='integer', description='how many dags will work in parallel', minimum=1),
'min_woo_number': Param(1000, type='integer', description='minimum woo_number', minimum=1),
'max_woo_number': Param(10000, type='integer', description='maximum woo_number', minimum=1)
},
render_template_as_native_obj=True
)
def zero_probability_generation(prob):
if r.random() > prob:
return False
else:
return True
def date_generation(start_date, end_date):
delta = end_date - start_date
random_seconds = r.randint(0, int(delta.total_seconds()))
random_date = start_date + timedelta(seconds=random_seconds)
return random_date
def generate_all_dates(start_date, end_date, amount):
erc_dates = []
end_dates = []
start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
total_seconds = int((end_date - start_date).total_seconds())
# random_seconds = np.random.choice(range(1, total_seconds), amount, replace=False) # Generates uniq values. Consuming a lot of resources.
random_seconds = np.around(np.random.uniform(1, total_seconds, amount), 0)
begin_dates = [start_date + timedelta(seconds=int(sec)) for sec in
random_seconds]
for woo_begin_date in begin_dates:
erc_date = date_generation(woo_begin_date, end_date).replace(day=r.choice([1, 25]), hour=0, minute=0, second=0,
microsecond=0)
if zero_probability_generation(0.0045) \
and len(end_dates) != 0 \
and end_dates[len(end_dates) - 1] is not None:
woo_end_date = date_generation(woo_begin_date, end_date)
else:
woo_end_date = None
erc_dates.append(erc_date)
end_dates.append(woo_end_date)
print('Date generation is complete')
return begin_dates, erc_dates, end_dates
def columns_generation(woo_number, start_date, end_date, size):
woo_number = [[woo_number] * size][0]
woo_begin, erc_date, woo_end = generate_all_dates(start_date, end_date, size)
woo_state_title = ['закрыт' if i is not None else None for i in woo_end]
woosh_overhead_pct_woprt_title = [[r.choice(all_services)] * size][0]
erc_workshop = np.random.randint(1, 90, size)
if woosh_overhead_pct_woprt_title[0] in main_cit_service:
erc_cable_sum_deb = [i if ws in range(1, 31) and zero_probability_generation(0.9) else 0 for ws, i in
zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
erc_cable_sum_cred = [i if ws in range(1, 31) and zero_probability_generation(0.9) else 0 for ws, i in
zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
erc_semi_product_sum_deb = [i if zero_probability_generation(0.5) else 0 for i in
np.around(np.random.exponential(500000000.0, size))]
erc_semi_product_sum_cred = [i if zero_probability_generation(0.5) else 0 for i in
np.around(np.random.exponential(500000000.0, size), 1)]
erc_accessory_mat_sum_deb = [i if zero_probability_generation(0.9) else 0 for i in
np.around(np.random.exponential(500000000.0, size), 1)]
erc_accessory_mat_sum_cred = [i if zero_probability_generation(0.9) else 0 for i in
np.around(np.random.exponential(500000000.0, size), 1)]
erc_contractor_service_deb = [i if ws in range(1, 11) and zero_probability_generation(0.1) else 0 for
ws, i in
zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
erc_contractor_service_cred = [i if ws in range(1, 11) and zero_probability_generation(0.1) else 0 for
ws, i in
zip(erc_workshop, np.around(np.random.exponential(500000000.0, size), 1))]
else:
erc_cable_sum_deb = [[0] * size][0]
erc_cable_sum_cred = [[0] * size][0]
erc_semi_product_sum_deb = [[0] * size][0]
erc_semi_product_sum_cred = [[0] * size][0]
erc_accessory_mat_sum_deb = [[0] * size][0]
erc_accessory_mat_sum_cred = [[0] * size][0]
erc_contractor_service_deb = [[0] * size][0]
erc_contractor_service_cred = [[0] * size][0]
erc_recycle_waste_deb = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0.00, -100000.00, size), 1)]
erc_recycle_waste_cred = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0.00, -100000.00, size), 1)]
erc_labor_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_labor_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_main_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_main_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_addit_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_addit_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_uni_social_tax_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_uni_social_tax_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_amort_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_amort_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_others_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_production_others_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_household_salary_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_household_salary_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_household_others_deb = np.around(np.random.uniform(0, 999999999, size), 1)
erc_household_others_cred = np.around(np.random.uniform(0, 999999999, size), 1)
erc_special_deb = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_special_cred = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_others_deb = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_others_cred = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_semi_fineshed_deb = [i if zero_probability_generation(0.5) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_semi_fineshed_cred = [i if zero_probability_generation(0.5) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_components_deb = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_components_cred = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_container_deb = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_container_cred = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_spec_equipment_deb = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_spec_equipment_cred = [i if zero_probability_generation(0.1) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_day_deb = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_day_cred = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_liveing_deb = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_liveing_cred = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_transp_deb = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
erc_travel_transp_cred = [i if zero_probability_generation(0.05) else 0 for i in
np.around(np.random.uniform(0, 999999999, size), 1)]
data = {
'woo_number': woo_number,
'woo_begin': woo_begin,
'woo_end': woo_end,
'woo_state_title': woo_state_title,
'woosh_overhead_pct_woprt_title': woosh_overhead_pct_woprt_title,
'erc_date': erc_date,
'erc_workshop': erc_workshop,
'erc_cable_sum_deb': erc_cable_sum_deb,
'erc_cable_sum_cred': erc_cable_sum_cred,
'erc_semi_product_sum_deb': erc_semi_product_sum_deb,
'erc_semi_product_sum_cred': erc_semi_product_sum_cred,
'erc_accessory_mat_sum_deb': erc_accessory_mat_sum_deb,
'erc_accessory_mat_sum_cred': erc_accessory_mat_sum_cred,
'erc_recycle_waste_deb': erc_recycle_waste_deb,
'erc_recycle_waste_cred': erc_recycle_waste_cred,
'erc_labor_deb': erc_labor_deb,
'erc_labor_cred': erc_labor_cred,
'erc_main_salary_deb': erc_main_salary_deb,
'erc_main_salary_cred': erc_main_salary_cred,
'erc_addit_salary_deb': erc_addit_salary_deb,
'erc_addit_salary_cred': erc_addit_salary_cred,
'erc_uni_social_tax_deb': erc_uni_social_tax_deb,
'erc_uni_social_tax_cred': erc_uni_social_tax_cred,
'erc_production_salary_deb': erc_production_salary_deb,
'erc_production_salary_cred': erc_production_salary_cred,
'erc_production_amort_deb': erc_production_amort_deb,
'erc_production_amort_cred': erc_production_amort_cred,
'erc_production_others_deb': erc_production_others_deb,
'erc_production_others_cred': erc_production_others_cred,
'erc_household_salary_deb': erc_household_salary_deb,
'erc_household_salary_cred': erc_household_salary_cred,
'erc_household_others_deb': erc_household_others_deb,
'erc_household_others_cred': erc_household_others_cred,
'erc_contractor_service_deb': erc_contractor_service_deb,
'erc_contractor_service_cred': erc_contractor_service_cred,
'erc_special_deb': erc_special_deb,
'erc_special_cred': erc_special_cred,
'erc_others_deb': erc_others_deb,
'erc_others_cred': erc_others_cred,
'erc_semi_fineshed_deb': erc_semi_fineshed_deb,
'erc_semi_fineshed_cred': erc_semi_fineshed_cred,
'erc_components_deb': erc_components_deb,
'erc_components_cred': erc_components_cred,
'erc_container_deb': erc_container_deb,
'erc_container_cred': erc_container_cred,
'erc_spec_equipment_deb': erc_spec_equipment_deb,
'erc_spec_equipment_cred': erc_spec_equipment_cred,
'erc_travel_day_deb': erc_travel_day_deb,
'erc_travel_day_cred': erc_travel_day_cred,
'erc_travel_liveing_deb': erc_travel_liveing_deb,
'erc_travel_liveing_cred': erc_travel_liveing_cred,
'erc_travel_transp_deb': erc_travel_transp_deb,
'erc_travel_transp_cred': erc_travel_transp_cred}
return data
def pandas_workflow(woo_number, start_date, end_date, size):
data = columns_generation(woo_number, start_date, end_date, size)
df = pd.DataFrame(data)
df = df.sort_values(by=['woo_begin', 'erc_date']).reset_index(drop=True)
df['erc_cable_transp_deb'] = df['erc_cable_sum_deb'] * 0.01
df['erc_cable_transp_cred'] = df['erc_cable_sum_cred'] * 0.01
df['erc_semi_product_transp_deb'] = df['erc_semi_product_sum_deb'] * 0.01
df['erc_semi_product_transp_cred'] = df['erc_semi_product_sum_cred'] * 0.01
df['erc_accessory_mat_transp_deb'] = df['erc_accessory_mat_sum_deb'] * 0.01
df['erc_accessory_mat_transp_cred'] = df['erc_accessory_mat_sum_cred'] * 0.01
df['erc_semi_fineshed_transp_deb'] = df['erc_semi_fineshed_deb'] * 0.01
df['erc_semi_fineshed_transp_cred'] = df['erc_semi_fineshed_cred'] * 0.01
df['erc_total_deb'] = df['erc_cable_sum_deb'] + df['erc_cable_transp_deb'] + df['erc_semi_product_sum_deb'] + df[
'erc_semi_product_transp_deb'] + df['erc_accessory_mat_sum_deb'] + df['erc_accessory_mat_transp_deb'] + df[
'erc_recycle_waste_deb'] + df['erc_main_salary_deb'] + df['erc_addit_salary_deb'] + df[
'erc_uni_social_tax_deb'] + df['erc_production_salary_deb'] + df[
'erc_production_amort_deb'] + df['erc_production_others_deb'] + df[
'erc_household_salary_deb'] + df['erc_household_others_deb'] + df[
'erc_contractor_service_deb'] + df['erc_special_deb'] + df['erc_others_deb'] + df[
'erc_semi_fineshed_deb'] + df['erc_semi_fineshed_transp_deb'] + df['erc_components_deb'] + \
df['erc_container_deb'] + df['erc_spec_equipment_deb'] + df['erc_travel_day_deb'] + df[
'erc_travel_liveing_deb'] + df['erc_travel_transp_deb']
df['erc_total_cred'] = df['erc_cable_sum_deb'] + df['erc_cable_transp_deb'] + df['erc_semi_product_sum_deb'] + df[
'erc_semi_product_transp_deb'] + df['erc_accessory_mat_sum_deb'] + df['erc_accessory_mat_transp_deb'] + df[
'erc_recycle_waste_deb'] + df['erc_main_salary_deb'] + df['erc_addit_salary_deb'] + df[
'erc_uni_social_tax_deb'] + df['erc_production_salary_deb'] + df[
'erc_production_amort_deb'] + df['erc_production_others_deb'] + df[
'erc_household_salary_deb'] + df['erc_household_others_deb'] + df[
'erc_contractor_service_deb'] + df['erc_special_deb'] + df['erc_others_deb'] + df[
'erc_semi_fineshed_deb'] + df['erc_semi_fineshed_transp_deb'] + df[
'erc_components_deb'] + df['erc_container_deb'] + df['erc_spec_equipment_deb'] + df[
'erc_travel_day_deb'] + df['erc_travel_liveing_deb'] + df['erc_travel_transp_cred']
columns = df.columns.to_list()
deb_columns = [col for col in columns if col.endswith('_deb')]
cred_columns = [col for col in columns if col.endswith('_cred')]
for deb_column, cred_column in zip(deb_columns, cred_columns):
if cred_column.endswith('_cred'):
bal_col = cred_column.replace('_cred', '_bal')
df[bal_col] = 0
for i in range(len(df)):
if i == 0 or df.loc[i - 1, 'woo_state_title'] == 'закрыт':
df.loc[i, bal_col] = df.loc[i, deb_column] - df.loc[i, cred_column]
else:
if df.loc[i, 'woo_state_title'] == 'закрыт':
df.loc[i, bal_col] = 0
df.loc[i, deb_column] = df.loc[i, cred_column] - df.loc[i - 1, bal_col]
else:
df.loc[i, bal_col] = df.loc[i - 1, bal_col] + df.loc[i, deb_column] - df.loc[i, cred_column]
column_order = []
for col in columns:
if col.endswith('_cred'):
column_order.append(col)
column_order.append(col.replace('_cred', '_bal'))
else:
column_order.append(col)
df = df.round(1)
# df[column_order].to_csv('df.csv', encoding='cp1251', index=False)
print(f'batch with {size} rows is ready')
return df[column_order]
@task(dag=dag)
def dataframe_to_ch(arg_list):
woo_number_start, woo_number_end, start_date, end_date, size, table_name, connection_id = arg_list
client = PostgresHook(postgres_conn_id=connection_id)
start_time = datetime.now()
print(f'batch generation started at : {start_time}')
for woo_number in range(woo_number_start, woo_number_end):
df = pandas_workflow(woo_number, start_date, end_date, size)
client.insert_rows(table=table_name, rows=df.values.tolist())
end_time = datetime.now()
print(f'Batch with woo_numbers {woo_number} was generated. Time has passed: {end_time - start_time}')
@task(dag=dag)
def running_multiple_task(**kwargs):
task_list = []
start_date = kwargs['params'].get('start_date')
end_date = kwargs['params'].get('end_date')
table_name = kwargs['params'].get('table_name')
connection_id = kwargs['params'].get('connection_id')
amount_rows = kwargs['params'].get('amount_rows')
num_instances = kwargs['params'].get('num_instances')
min_woo_number = kwargs['params'].get('min_woo_number')
max_woo_number = kwargs['params'].get('max_woo_number')
num_per_process = (max_woo_number - min_woo_number) // num_instances
rows_per_woo_number = amount_rows // (max_woo_number - min_woo_number)
for i in range(num_instances):
min_instance_woo_number = min_woo_number + i * num_per_process
max_instance_woo_number = min_woo_number + (i + 1) * num_per_process
task_list.append(
[min_instance_woo_number, max_instance_woo_number, start_date, end_date, rows_per_woo_number, table_name,
connection_id])
return task_list
dataframe_to_ch.partial().expand(arg_list=running_multiple_task())
Для запуска необходимо нажать на кнопку с треугольником и заполнить параметры.
Параметры
После запуска необходимо заполнить поля с параметрами, исходя из которых будут сгенерированы данные. У каждого параметра есть описание и заполненное значение по умолчанию. Подробнее о параметрах:
- start_date - начиная с какой даты генерировать данные
- end_date - по какую дату генерировать значения
- table_name - название таблицы, куда сложить данные
- connection_id - id подключения в Apache Airflow
- amount_rows - общее количество строк которое необходимо сгенерировать
- num_instances - на сколько инстансов разбить задачу
- min_woo_number - начиная с какого номера генерировать значения
- max_woo_number - по какой номер генерировать значения
Исходя из полученных значений рассчитывается сколько строк необходимо на каждый woo_number, список всех woo_number, а также сколько строк должен сгенерировать каждый таск (num_instances).
Функции
В скрипте функции принимают на себя полученные из переменных аргументы, и исходя из этого рассчитывают показатели для каждой колонки.
zero_probability_generation
Вероятность получения 0
Принимает на себя в качестве аргумента число от 0 до 1 (вероятность). Генерирует случайное число в этом диапазоне, и сравнивает с принятым числом, и исходя из результата возвращает False или True.
date_generation
Генератор даты
Принимает на себя в качестве аргумента 2 даты в формате datetime, и генерирует случайную дату между ними. Эту случайную дату и возвращает.
generate_all_dates
Генератор всех дат для итоговой таблицы.
Принимает на себя в качестве аргумента 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.
Данная функция использует функцию date_generation для генерации дат. Также учитывает специфику проекта по датам.
Возвращает 3 списка сгенерированных дат.
columns_generation
Генератор колонок
Принимает на себя в качестве аргумента номер для колонки woo_number, 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.
Для получения колонок дат обращается к функции generate_all_dates, генерирует остальные колонки внутри себя.
Каждая колонка - список значений.
Затем преобразует все в словарь, где каждому ключу соответствует ранее сгенерированный список значений. Название каждого ключа должно совпадать с наименованием колонок заранее созданной таблицы.
Возвращает полученный словарь.
pandas_workflow
Рабочее пространство пандаса.
Принимает на себя в качестве аргумента номер для колонки woo_number, 2 даты в формате строк в формате ‘%Y-%m-%d %H:%M:%S’ и число, обозначающее количество строк, которое необходимо сгенерировать.
Получает словарь из функции columns_generation. Преобразовывает данный словарь в формат пандаса, производит вычисления согласно специфики проекта.
Формирует список из названия колонок в нужном порядке. Возвращает датафрейм с колонками в нужном порядке.
dataframe_to_ch
Датафрейм в БД
Принимает на себя в качестве аргумента список из значений полученных при заполнении параметров. Формирует датафреймы с помощью функции pandas_workflow, и записывает их в БД.
running_multiple_task
Запуск нескольких заданий
Принимает на себя произвольное число именованных аргументов, полученных из Airflow. Исходя из полученных аргументов вычисляет количество и диапазон номеров на каждый инстанс. Полученные значения складывает в список для дальнейшего запуска.