Apache Airflow ในปี 2026: การจัดการ Pipeline, DAG และคำถามสัมภาษณ์งาน Data Engineering

คู่มือ Apache Airflow 3.2 ฉบับสมบูรณ์: การเขียน DAG ด้วย Task SDK, รูปแบบการจัดการ data pipeline, asset partition, dynamic task mapping, native async และคำถามสัมภาษณ์งานสำหรับวิศวกรข้อมูลในปี 2026

Apache Airflow pipeline orchestration DAGs tutorial 2026

Apache Airflow 3.2 ถือเป็นการอัปเดตครั้งสำคัญที่สุดของแพลตฟอร์ม orchestration แบบ open-source นับตั้งแต่การปรับโครงสร้างสถาปัตยกรรมใหญ่ในเวอร์ชัน 3.0 ด้วยยอดดาวน์โหลดบน PyPI มากกว่า 10 ล้านครั้งต่อเดือน และการนำไปใช้งานจริงในองค์กรระดับ Airbnb ไปจนถึง Spotify ทำให้ Airflow ยังคงเป็นเครื่องมือหลักสำหรับการสร้าง กำหนดเวลา และตรวจสอบ data pipeline บทความนี้ครอบคลุมการเขียน DAG ด้วย Task SDK รูปแบบการจัดการ pipeline และคำถามสัมภาษณ์ที่พบบ่อยในตำแหน่ง data engineering

ภาพรวม Airflow 3.2

Airflow 3.2 เปิดตัวในเดือนเมษายน 2026 นำเสนอ asset partition สำหรับการจัดตารางเวลาแบบ data-aware ในระดับพาร์ทิชัน, native async support ใน PythonOperator และ multi-team deployment การ import DAG ทั้งหมดใช้ namespace airflow.sdk ที่เสถียรซึ่งเปิดตัวครั้งแรกใน Airflow 3.0

การเขียน DAG ด้วย Airflow Task SDK

Airflow 3.0 เปิดตัว Task SDK ซึ่งเป็นแพ็กเกจแยกที่แยกการกำหนด DAG ออกจากส่วนภายในของ Airflow เป้าหมายหลักคือการเขียน DAG ที่พกพาได้และมีเวอร์ชันที่เสถียร สามารถอัปเกรด Airflow ได้โดยไม่ต้องแก้ไขโค้ด ออบเจกต์หลักทั้งหมด ได้แก่ DAG, dag, task, BaseOperator, Connection และ Variable อยู่ภายใต้ airflow.sdk

เส้นทาง import แบบเก่า (airflow.decorators.task, airflow.models.dag.DAG) ยังใช้งานได้ในเวอร์ชัน 3.2 แต่ถูกทำเครื่องหมายว่า deprecated และจะถูกลบออกในเวอร์ชันถัดไป

python
# etl_daily_revenue.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["finance", "etl"],
)
def etl_daily_revenue():
    @task()
    def extract_transactions() -> list[dict]:
        # Pulls raw transactions from the payments API
        import requests
        response = requests.get("https://api.internal/payments/daily")
        return response.json()["transactions"]

    @task(multiple_outputs=True)
    def transform(transactions: list[dict]) -> dict:
        # Aggregates by currency and computes totals
        totals = {}
        for tx in transactions:
            currency = tx["currency"]
            totals[currency] = totals.get(currency, 0) + tx["amount"]
        return {"totals": totals, "count": len(transactions)}

    @task()
    def load(totals: dict, count: int):
        # Inserts aggregated row into the warehouse
        from warehouse import insert_revenue_summary
        insert_revenue_summary(totals=totals, transaction_count=count)

    raw = extract_transactions()
    result = transform(raw)
    load(result["totals"], result["count"])

etl_daily_revenue()

@dag decorator ทำหน้าที่แทน context manager แบบเดิม with DAG(...) ส่วน @task decorator แปลงฟังก์ชัน Python ธรรมดาให้เป็น Airflow task โดย XCom serialization ถูกจัดการโดยอัตโนมัติ การเรียก transform(raw) เป็นการประกาศ dependency โดย Airflow จะสร้าง DAG edge จากกราฟการเรียกฟังก์ชันโดยอัตโนมัติ

Dynamic Task Mapping สำหรับ Workload ที่เปลี่ยนแปลง

DAG แบบ static จะมีปัญหาเมื่อจำนวนรายการที่ต้องประมวลผลเปลี่ยนแปลงระหว่างแต่ละรอบการทำงาน Dynamic task mapping ซึ่งเสถียรตั้งแต่ Airflow 2.4 และได้รับการปรับปรุงใน 3.x แก้ปัญหานี้ด้วยการขยาย task ณ runtime ผ่านเมธอด .expand()

python
# etl_multi_region.py
import pendulum
from airflow.sdk import dag, task

@dag(
    schedule="@hourly",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["regional", "etl"],
)
def etl_multi_region():
    @task()
    def get_regions() -> list[str]:
        # Returns the active regions from config
        return ["us-east-1", "eu-west-1", "ap-southeast-1"]

    @task()
    def process_region(region: str) -> dict:
        # Processes events for a single region
        from data_processor import aggregate_events
        return aggregate_events(region)

    @task()
    def merge_results(results: list[dict]):
        # Combines all regional outputs into a single report
        from reporter import publish_global_report
        publish_global_report(results)

    regions = get_regions()
    # .expand() creates one mapped task instance per region
    processed = process_region.expand(region=regions)
    merge_results(processed)

etl_multi_region()

เมื่อถึงเวลาทำงาน Airflow จะสร้าง task instance ของ process_region จำนวน 3 ชุดแบบขนาน หนึ่งชุดต่อหนึ่ง region หาก region ที่สี่ถูกเพิ่มเข้ามาในวันถัดไป ไม่จำเป็นต้องแก้ไขโค้ด DAG แต่อย่างใด task merge_results จะรอจนกว่า mapped instance ทั้งหมดจะเสร็จสิ้นก่อนจึงจะเริ่มทำงาน

ขีดจำกัดของ mapped task

ตามค่าเริ่มต้น Airflow จำกัด mapped task ไว้ที่ 1024 instance ต่อหนึ่ง mapping สามารถปรับค่านี้ได้ด้วย max_map_length ในการตั้งค่า DAG เมื่อต้องประมวลผลชุดข้อมูลขนาดใหญ่

Asset Partition: การจัดตารางเวลาแบบ Data-Aware ใน Airflow 3.2

ก่อน Airflow 3.2 การจัดตารางเวลาแบบ data-aware ทำงานในระดับ asset เท่านั้น กล่าวคือเมื่อ producer DAG อัปเดต asset ใดก็ตาม consumer DAG ทั้งหมดจะถูก trigger โดยไม่คำนึงว่าส่วนไหนของข้อมูลที่เปลี่ยนแปลง Asset partition แก้ปัญหานี้โดยเปิดใช้งานความละเอียดในระดับพาร์ทิชัน

พิจารณาสถานการณ์ที่ upstream DAG สามตัวผลิตสถิติผู้เล่นรายชั่วโมงสำหรับลีกกีฬาแต่ละประเภท downstream analytics DAG ควร trigger เฉพาะเมื่อลีกทั้งสามเผยแพร่ข้อมูลสำหรับชั่วโมงเดียวกันเท่านั้น

python
# downstream_analytics.py
import pendulum
from airflow.sdk import dag, task
from airflow.timetables.assets import CronPartitionTimetable

# Define partitioned assets with hourly granularity
nba_stats = Asset("s3://datalake/nba/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
epl_stats = Asset("s3://datalake/epl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))
nfl_stats = Asset("s3://datalake/nfl/hourly/", partitions=CronPartitionTimetable("0 * * * *"))

@dag(
    # Triggers only when all three assets have matching partition
    schedule=(nba_stats & epl_stats & nfl_stats),
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
)
def unified_sports_analytics():
    @task()
    def aggregate_all_leagues(**context):
        partition = context["partition"]
        # Process only the specific hourly slice across all leagues
        from analytics import build_cross_league_report
        build_cross_league_report(partition=partition)

    aggregate_all_leagues()

unified_sports_analytics()

การจัดตารางเวลาแบบ partition ช่วยขจัดการทำงานของ pipeline ที่ซ้ำซ้อน downstream DAG จะทำงานเฉพาะเมื่อพาร์ทิชันที่ต้องการพร้อมใช้งานจาก upstream source ทั้งหมดเท่านั้น ไม่ทำงานก่อนกำหนดหรือเมื่อข้อมูลยังไม่ครบถ้วน

พร้อมที่จะพิชิตการสัมภาษณ์ Data Engineering แล้วหรือยังครับ?

ฝึกฝนด้วยตัวจำลองแบบโต้ตอบ, flashcards และแบบทดสอบเทคนิคครับ

Native Async Task สำหรับงานที่เน้น I/O

Airflow 3.2 เพิ่ม native async support ใน PythonOperator ก่อนหน้านี้ การรัน I/O operation แบบ concurrent (เช่น การเรียก API หลายพันครั้ง หรือการดาวน์โหลดไฟล์จำนวนมาก) จำเป็นต้องเขียน custom deferrable operator ตอนนี้สามารถส่งฟังก์ชัน async ได้โดยตรง

python
# async_file_download.py
import asyncio
import aiohttp
from airflow.sdk import dag, task
import pendulum

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
    tags=["async", "download"],
)
def async_file_download():
    @task()
    async def download_files():
        urls = [f"https://data.source/files/{i}.csv" for i in range(500)]
        async with aiohttp.ClientSession() as session:
            # Downloads 500 files concurrently instead of sequentially
            tasks = [fetch_and_save(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        return {"downloaded": len(results)}

    download_files()

async def fetch_and_save(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        content = await resp.read()
        filename = url.split("/")[-1]
        with open(f"/data/downloads/{filename}", "wb") as f:
            f.write(content)
        return filename

async_file_download()

แนวทาง async ดาวน์โหลดไฟล์ 500 ไฟล์พร้อมกันผ่าน worker slot เดียว เปรียบเทียบกับการเรียก HTTP แบบ sequential จำนวน 500 ครั้งในเวอร์ชัน synchronous สำหรับ workload ที่เป็น I/O-bound วิธีนี้ช่วยเพิ่มความเร็วได้หลายเท่าโดยไม่ต้องเพิ่ม worker

สถาปัตยกรรม Airflow: ส่วนประกอบหลักและลำดับการทำงาน

การทำความเข้าใจสถาปัตยกรรมของ Airflow มีความจำเป็นทั้งสำหรับการใช้งานจริงใน production และการตอบคำถามสัมภาษณ์ ส่วนประกอบห้าส่วนทำงานร่วมกันในทุก DAG run ดังนี้

  • Scheduler -- ทำการ parse ไฟล์ DAG, แก้ไข dependency และจัดคิว task สำหรับการประมวลผล ใน Airflow 3.x scheduler ทำงานแบบ stateless กับ metadata database
  • Executor -- กำหนดว่า task จะทำงานที่ไหน LocalExecutor จัดการการตั้งค่าแบบเครื่องเดียว CeleryExecutor กระจายงานไปยัง worker node หลายตัว KubernetesExecutor สร้าง pod ใหม่สำหรับแต่ละ task
  • Workers -- ประมวลผลโค้ดของ task จริง ด้วย Task Execution API ใหม่ใน Airflow 3.0 worker สื่อสารผ่าน contract ที่เสถียร ทำให้สามารถประมวลผลใน container, edge environment หรือ external runtime ได้
  • Metadata database -- PostgreSQL (แนะนำ) หรือ MySQL เก็บข้อมูล DAG definition, task state, ค่า XCom, connection credential และ audit log
  • Web server -- Airflow UI สำหรับตรวจสอบ DAG run, ดู log, trigger การทำงานด้วยตนเอง และจัดการ connection
การเลือก executor สำหรับ production

หลีกเลี่ยง SequentialExecutor ใน production เนื่องจากรัน task ทีละตัวเท่านั้น และมีไว้สำหรับการพัฒนาเท่านั้น สำหรับสภาพแวดล้อมที่ใช้ Kubernetes KubernetesExecutor ให้การแยกส่วนที่แข็งแกร่งที่สุด เนื่องจากแต่ละ task จะได้รับ pod ของตนเองพร้อม resource และ dependency ที่เป็นอิสระ

เปรียบเทียบ Airflow vs. Prefect vs. Dagster ในปี 2026

Airflow แข่งขันกับทางเลือกสำคัญอีกสองตัว การเลือกที่เหมาะสมขึ้นอยู่กับขนาดของทีม โครงสร้างพื้นฐานที่มีอยู่ และประเภทของ pipeline ที่ต้องการสร้าง

| Feature | Airflow 3.2 | Prefect 3.x | Dagster 1.9 | |---|---|---|---| | DAG definition | Python decorators (airflow.sdk) | Python decorators (@flow, @task) | Python decorators (@asset, @op) | | Scheduling | Cron, asset-aware, partition-aware | Cron, event-driven | Cron, sensor-based, asset-aware | | Execution model | Centralized scheduler + distributed workers | Hybrid (server + work pools) | Centralized dagster-daemon | | Dynamic tasks | .expand() mapped tasks | Native Python loops | Dynamic partitions | | Async support | Native in 3.2 | Native since 2.0 | Async I/O ops | | Multi-team isolation | Built-in (3.2 experimental) | Workspace-based (Cloud) | Branch deployments | | Community size | Largest (35k+ GitHub stars) | Growing (18k+ stars) | Growing (12k+ stars) | | Best fit | Complex, multi-team pipelines with established infrastructure | Smaller teams wanting fast iteration | Data asset-centric organizations |

ข้อได้เปรียบหลักของ Airflow อยู่ที่ ecosystem ด้วย provider package มากกว่า 80 ตัวที่ครอบคลุม cloud service, database และ API หลักทุกเจ้า Prefect โดดเด่นด้านประสบการณ์ของนักพัฒนาที่ต้องเขียนโค้ดน้อยกว่า ส่วนโมเดลที่เน้น asset ของ Dagster เหมาะกับทีมที่คิดในเชิง data product มากกว่าลำดับของ task

คำถามสัมภาษณ์: Apache Airflow สำหรับ Data Engineer

คำถามต่อไปนี้สะท้อนสิ่งที่ผู้จัดการฝ่ายรับสมัครและวิศวกรอาวุโสถามในการสัมภาษณ์ตำแหน่ง data engineering ในบริษัทที่ใช้ Airflow ใน production

DAG คืออะไร และ Airflow ใช้งานอย่างไร?

DAG (Directed Acyclic Graph) กำหนด workflow เป็นชุดของ task ที่มี dependency โดยรับประกันว่าไม่มีการอ้างอิงแบบวนรอบ Airflow จะ parse ไฟล์ Python ในโฟลเดอร์ dags/ สร้างกราฟ dependency และ scheduler จะกำหนดลำดับการประมวลผล แต่ละ DAG run จะสร้างออบเจกต์ DagRun ที่เชื่อมโยงกับ logical date คุณสมบัติ "acyclic" รับประกันว่า scheduler จะหาลำดับการทำงานที่ถูกต้องได้เสมอ task A ทำงานก่อน task B โดยไม่มีการย้อนกลับ

XCom ทำงานอย่างไร และเมื่อไหร่ควรหลีกเลี่ยง?

XCom (cross-communication) ส่งข้อมูลขนาดเล็กระหว่าง task หนึ่ง task push ค่า return ไปยัง XCom จากนั้น downstream task จะ pull ค่ามาใช้ Task SDK จัดการกระบวนการนี้โดยอัตโนมัติเมื่อส่งค่า return ระหว่าง decorated task XCom เก็บข้อมูลใน metadata database ตามค่าเริ่มต้น จึงไม่เหมาะสำหรับชุดข้อมูลขนาดใหญ่ (ข้อมูลเกินไม่กี่ KB) สำหรับการถ่ายโอนข้อมูลขนาดใหญ่ ควรใช้ external storage (S3, GCS) แล้วส่งเฉพาะ reference path ผ่าน XCom

อธิบายความแตกต่างระหว่าง schedule, start_date และ catchup

พารามิเตอร์ schedule (เปลี่ยนชื่อจาก schedule_interval ใน Airflow 3.x) กำหนดความถี่ในการรัน DAG ซึ่งอาจเป็น cron string, timedelta, timetable object หรือ asset trigger start_date กำหนด logical date แรกสุดที่สามารถสร้าง DAG run ได้ catchup=True (ค่าเริ่มต้น) จะสร้าง DAG run สำหรับทุกช่วงเวลาที่พลาดไประหว่าง start_date กับเวลาปัจจุบัน การตั้ง catchup=False บอก scheduler ให้ข้ามช่วงเวลาที่ผ่านไปแล้วและจัดตารางเฉพาะจากเวลาปัจจุบันเป็นต้นไป รูปแบบที่ใช้บ่อยใน production: ตั้ง catchup=False สำหรับ operational DAG และ catchup=True สำหรับ historical backfill

KubernetesExecutor แตกต่างจาก CeleryExecutor อย่างไร?

CeleryExecutor ดูแล pool ของ worker process ที่ทำงานต่อเนื่อง เชื่อมต่อผ่าน message broker (Redis หรือ RabbitMQ) task จะถูกจัดคิวและประมวลผลบน worker ที่ว่างอยู่ KubernetesExecutor สร้าง Kubernetes pod ใหม่สำหรับแต่ละ task โดยใช้ Docker image และ resource requirement ของ task นั้น Celery ให้ latency ต่ำกว่า (ไม่มี pod startup overhead) และทำงานได้ดีกับ workload ที่เป็นแบบเดียวกัน KubernetesExecutor ให้การแยกส่วนที่แข็งแกร่งกว่า ควบคุม resource ระดับ task ได้ และไม่ต้องจัดการ worker pool แบบคงที่ ซึ่งเหมาะสำหรับ workload ที่หลากหลายซึ่ง task มี dependency และความต้องการหน่วยความจำที่แตกต่างกัน

มีกลยุทธ์อะไรบ้างในการจัดการ task failure?

Airflow มีกลไกจัดการ failure หลายรูปแบบ retries และ retry_delay กำหนดการ retry อัตโนมัติพร้อม exponential backoff on_failure_callback trigger custom logic (เช่น Slack alert, PagerDuty incident) เมื่อ task ล้มเหลว trigger_rule ควบคุมว่า downstream task ตอบสนองอย่างไร ได้แก่ all_success (ค่าเริ่มต้น), one_success, all_failed หรือ none_failed_min_one_success สำหรับปัญหา infrastructure ชั่วคราว พารามิเตอร์ retry_exponential_backoff=True จะเพิ่มเวลารอระหว่าง retry SLA (ซึ่งในเวอร์ชัน 3.x เปลี่ยนชื่อเป็น Deadline Alerts) ตรวจสอบระยะเวลาการประมวลผลและ fire callback เมื่อ task ใช้เวลาเกินที่กำหนด

แนวปฏิบัติที่ดีสำหรับ Airflow ใน Production

การรัน Airflow ให้เสถียรในระดับ scale ต้องใส่ใจกับรูปแบบการปฏิบัติงานหลายอย่างที่เกินขอบเขตของการเขียน DAG ที่ถูกต้อง

Task แบบ idempotent ทุก task ควรให้ผลลัพธ์เดียวกันเมื่อถูกประมวลผลหลายครั้งด้วย input เดียวกัน ควรใช้ INSERT ... ON CONFLICT หรือ MERGE แทน INSERT ธรรมดา แบ่ง output data ตาม logical date วิธีนี้รับประกันความปลอดภัยในการ retry และ backfill โดยไม่เกิดข้อมูลซ้ำ

DAG ขนาดเล็กที่มุ่งเน้น หลีกเลี่ยงการสร้าง DAG ขนาดใหญ่ที่มี task หลายสิบตัว ควรแยก pipeline ที่ซับซ้อนออกเป็นหลาย DAG ที่เชื่อมต่อกันผ่าน asset (เดิมเรียกว่า dataset) DAG ขนาดเล็ก parse ได้เร็วกว่า debug ง่ายกว่า และอนุญาตให้รีสตาร์ท pipeline บางส่วนได้

การจัดการ connection เก็บ credential ทั้งหมดใน connection manager ของ Airflow หรือ external secrets backend (AWS Secrets Manager, HashiCorp Vault) ห้าม hardcode credential ในไฟล์ DAG โดยเด็ดขาด Airflow 3.x เข้ารหัส connection field ใน metadata database แบบ at rest

การตรวจสอบและแจ้งเตือน ส่ง Airflow metric ไปยัง Prometheus หรือ StatsD ติดตาม scheduler_heartbeat, dag_processing.total_parse_time และ executor.queued_tasks Airflow 3.2 เพิ่ม OpenTelemetry trace สำหรับการ observe pipeline แบบ end-to-end

สำหรับการเตรียมตัวสัมภาษณ์งาน data engineering ที่ครอบคลุม pipeline orchestration SharpSkill มีคำถามสัมภาษณ์ Airflow และโมดูลฝึกปฏิบัติที่จำลองสถานการณ์จริงจากสภาพแวดล้อม production

เริ่มฝึกซ้อมเลย!

ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ

สรุป

  • Airflow 3.2 เปิดตัว Task SDK (airflow.sdk) เป็น API ที่เสถียรสำหรับการเขียน DAG ควรย้าย import มาใช้ตอนนี้เพื่อหลีกเลี่ยง breaking change ในเวอร์ชันถัดไป
  • Asset partition ช่วยให้จัดตารางเวลาแบบ data-aware ในระดับพาร์ทิชันได้ ขจัดการ trigger pipeline ที่ซ้ำซ้อนเมื่อข้อมูลอัปเดตเพียงบางส่วน
  • Native async support ใน PythonOperator จัดการ workload ที่เน้น I/O (เช่น batch download, API fan-out) ได้โดยไม่ต้องเขียน custom deferrable operator
  • Dynamic task mapping ด้วย .expand() ปรับตัวตามขนาด workload ที่เปลี่ยนแปลงณ runtime โดยไม่ต้องแก้ไขโค้ด DAG
  • การเตรียมตัวสัมภาษณ์ควรครอบคลุมกลไกของ DAG, ข้อแลกเปลี่ยนระหว่าง executor (Celery vs. Kubernetes), ข้อจำกัดของ XCom และรูปแบบ idempotency
  • การ deploy ใน production ได้ประโยชน์จาก DAG ขนาดเล็กที่เป็น idempotent, การจัดการ secret ภายนอก และการส่ง metric ไปยังแพลตฟอร์ม observability

เริ่มฝึกซ้อมเลย!

ทดสอบความรู้ของคุณด้วยตัวจำลองสัมภาษณ์และแบบทดสอบเทคนิคครับ

แท็ก

#Apache Airflow
#DAG
#Data Pipeline
#Data Engineering
#Python

แชร์

บทความที่เกี่ยวข้อง

Apache Spark 4 new features and structured streaming

Apache Spark 4: ฟีเจอร์ใหม่ Structured Streaming และคำถามสัมภาษณ์งาน

สำรวจฟีเจอร์สำคัญใน Apache Spark 4 รวมถึง ANSI SQL Mode, VARIANT data type, Real-Time Mode streaming และ transformWithState API พร้อมตัวอย่างโค้ดและคำถามสัมภาษณ์งานที่พบบ่อย

ภาพประกอบบทเรียน data pipeline Apache Spark กับ Python แสดงการไหลของข้อมูลและขั้นตอนการประมวลผล

Apache Spark กับ Python: สร้าง Data Pipeline ทีละขั้นตอน

บทเรียน PySpark ภาคปฏิบัติครอบคลุมการทำงานกับ DataFrame การสร้าง ETL pipeline และฟีเจอร์ของ Spark 4.0 พร้อมตัวอย่างโค้ดพร้อมใช้งานจริงสำหรับ data engineer ที่กำลังเตรียมสอบสัมภาษณ์เชิงเทคนิค

dbt data transformations testing interview 2026

dbt ในปี 2026: การแปลงข้อมูล การทดสอบ และคำถามสัมภาษณ์งาน

คู่มือ dbt สำหรับวิศวกรข้อมูล: การแปลง SQL, การสร้างโมเดลแบบแบ่งชั้น, กลยุทธ์ incremental, การทดสอบคุณภาพข้อมูล และคำถามสัมภาษณ์พร้อมตัวอย่างโค้ดสำหรับปี 2026