from datetime import timedelta
import airflow
from airflow import DAG
import json
import boto3

# Import necessary providers for AWS services
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonVirtualenvOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTransformOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.sagemaker import SageMakerHook
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator

# Define the name of the DAG
dag_name = 'cross_account_data_processing'

# Unique identifier for the DAG run
correlation_id = "{{ run_id }}"

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'retries': 0,
    'retry_delay': timedelta(minutes=2),
    'provide_context': True,
}

# Initialize the DAG
dag = DAG(
    dag_name,
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2),
    schedule=None  # Set to None for manual triggering
)

# Define a parameterized bucket name for the S3 sensor task
bucket_name = "<INSERT-DATA-PROCESSING-BUCKET-NAME-US-EAST-1>"  # # Data processing bucket created by stackset "DPML_AccountB_Setup.yaml in us-east-1"

# S3 Sensor to detect when the raw data is available in S3
s3_sensor = S3KeySensor(
    task_id='s3_sensor',
    bucket_name=bucket_name,  # Use the parameterized bucket name
    bucket_key='raw/ml_train_data.csv',
    aws_conn_id='aws_crossaccount_role_conn_east1',  # Connection ID for cross-account role
    timeout=60 * 60,  # Timeout after 1 hour
    poke_interval=60,  # Check every 60 seconds
    dag=dag
)

# Glue Job to preprocess data and transform it
glue_preprocess_task = GlueJobOperator(
    task_id='glue_preprocess_task',
    job_name='mwaa_glue_raw_to_transform',  # Name of the Glue job
    script_args={
        '--bucket_name': bucket_name,  # Use the parameterized bucket name
    },
    aws_conn_id='aws_crossaccount_role_conn_east1',  # Connection ID for AWS access
    dag=dag,
)

# Define task dependencies
s3_sensor >> glue_preprocess_task  # Ensure glue preprocessing runs after the S3 sensor task

