Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/newsfragments/69130.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Enforce TCP/UDP port range validation (1-65535) across the Connection model, API schemas, Task SDK, and CLI.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class ConnectionBody(StrictBaseModel):
host: str | None = Field(default=None)
login: str | None = Field(default=None)
schema_: str | None = Field(None, alias="schema")
port: int | None = Field(default=None)
port: int | None = Field(default=None, ge=1, le=65535)
password: str | None = Field(default=None)
extra: str | None = Field(default=None)
team_name: str | None = Field(max_length=50, default=None)
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ def string_lower_type(val):
ARG_CONN_SCHEMA = Arg(
("--conn-schema",), help="Connection schema, optional when adding a connection", type=str
)
ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port, optional when adding a connection", type=str)
ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port (1-65535), optional when adding a connection", type=int)
ARG_CONN_EXTRA = Arg(
("--conn-extra",), help="Connection `Extra` field, optional when adding a connection", type=str
)
Expand Down
20 changes: 19 additions & 1 deletion airflow-core/src/airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from urllib.parse import parse_qsl, quote, unquote, urlencode, urlsplit

from sqlalchemy import ForeignKey, Integer, String, Text, select
from sqlalchemy.orm import Mapped, mapped_column, reconstructor
from sqlalchemy.orm import Mapped, mapped_column, reconstructor, validates

from airflow._shared.module_loading import import_string
from airflow._shared.secrets_backend.base import call_secrets_backend_method
Expand Down Expand Up @@ -90,6 +90,19 @@ def sanitize_conn_id(conn_id: str | None, max_length=CONN_ID_MAX_LEN) -> str | N
return res.group(0)


def validate_port(port: int | None) -> int | None:
"""
Validate that port is within the valid TCP/UDP range (1-65535).

:param port: The port number to validate.
:return: The port number if valid, None if port is None.
:raises ValueError: If port is outside the valid range.
"""
if port is not None and not (1 <= port <= 65535):
raise ValueError(f"Port must be between 1 and 65535, got {port}")
return port


def _parse_netloc_to_hostname(uri_parts):
"""
Parse a URI string to get the correct Hostname.
Expand Down Expand Up @@ -202,6 +215,10 @@ def __init__(
mask_secret(quote(self.password))
self.team_name = team_name

@validates("port")
def _validate_port(self, _key: str, value: int | None) -> int | None:
return validate_port(value)

@staticmethod
def _validate_extra(extra, conn_id) -> None:
"""Verify that ``extra`` is a JSON-encoded Python dict."""
Expand Down Expand Up @@ -591,6 +608,7 @@ def from_json(cls, value, conn_id=None) -> Connection:
kwargs["port"] = int(port)
except ValueError:
raise ValueError(f"Expected integer value for `port`, but got {port!r} instead.")
validate_port(kwargs["port"])
return Connection(conn_id=conn_id, **kwargs)

def as_json(self) -> str:
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/models/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ def __init__(
self.token = secrets.token_urlsafe(32)
self.state = ConnectionTestState.PENDING

@validates("port")
def _validate_port(self, _key: str, value: int | None) -> int | None:
from airflow.models.connection import validate_port

return validate_port(value)

@validates("state")
def _sync_active_connection_id(
self, _key: str, value: str | ConnectionTestState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2189,3 +2189,38 @@ def test_post_should_fail_with_non_json_object_as_extra(
"method": "POST",
},
)


class TestConnectionBodyPortValidation:
"""Tests for port range validation on REST API ConnectionBody (issue #68382)."""

def test_connection_body_rejects_port_too_high(self):
"""ConnectionBody should reject port > 65535."""
from pydantic import ValidationError

with pytest.raises(ValidationError, match="less than or equal to 65535"):
ConnectionBody(connection_id="test", conn_type="http", port=99999)

def test_connection_body_rejects_port_zero(self):
"""ConnectionBody should reject port=0."""
from pydantic import ValidationError

with pytest.raises(ValidationError, match="greater than or equal to 1"):
ConnectionBody(connection_id="test", conn_type="http", port=0)

def test_connection_body_rejects_negative_port(self):
"""ConnectionBody should reject negative port."""
from pydantic import ValidationError

with pytest.raises(ValidationError, match="greater than or equal to 1"):
ConnectionBody(connection_id="test", conn_type="http", port=-1)

def test_connection_body_accepts_valid_port(self):
"""ConnectionBody should accept valid port."""
body = ConnectionBody(connection_id="test", conn_type="http", port=8080)
assert body.port == 8080

def test_connection_body_accepts_none_port(self):
"""ConnectionBody should accept port=None."""
body = ConnectionBody(connection_id="test", conn_type="http", port=None)
assert body.port is None
87 changes: 87 additions & 0 deletions airflow-core/tests/unit/models/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,90 @@ def test_get_conn_id_to_team_name_mapping(self, testing_team: Team, session: Ses
"test_conn2": None,
}
clear_db_connections()


class TestConnectionPortValidation:
"""Tests for port range validation (issue #68382)."""

@pytest.mark.parametrize(
"port",
[
1,
80,
443,
8080,
5432,
3306,
65535,
],
)
def test_validate_port_accepts_valid_ports(self, port):
"""Valid TCP/UDP ports (1-65535) should be accepted."""
from airflow.models.connection import validate_port

assert validate_port(port) == port

def test_validate_port_accepts_none(self):
"""None (no port) should be accepted."""
from airflow.models.connection import validate_port

assert validate_port(None) is None

@pytest.mark.parametrize(
"port",
[
0,
-1,
-100,
65536,
99999999,
100000,
],
)
def test_validate_port_rejects_invalid_ports(self, port):
"""Ports outside range 1-65535 should be rejected."""
from airflow.models.connection import validate_port

with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
validate_port(port)

def test_connection_init_rejects_invalid_port(self):
"""Connection() constructor should reject invalid port."""
with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
Connection(conn_id="test", conn_type="http", port=99999)

def test_connection_init_rejects_zero_port(self):
"""Connection() constructor should reject port=0."""
with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
Connection(conn_id="test", conn_type="http", port=0)

def test_connection_init_rejects_negative_port(self):
"""Connection() constructor should reject negative port."""
with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
Connection(conn_id="test", conn_type="http", port=-1)

def test_connection_init_accepts_valid_port(self):
"""Connection() constructor should accept valid port."""
conn = Connection(conn_id="test", conn_type="http", port=8080)
assert conn.port == 8080

def test_connection_init_accepts_none_port(self):
"""Connection() constructor should accept port=None."""
conn = Connection(conn_id="test", conn_type="http", port=None)
assert conn.port is None

def test_connection_from_json_rejects_invalid_port(self):
"""Connection.from_json() should reject invalid port."""
import json

conn_json = json.dumps({"conn_type": "http", "port": 99999})
with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
Connection.from_json(conn_json, conn_id="test")

def test_connection_from_json_accepts_valid_port(self):
"""Connection.from_json() should accept valid port."""
import json

conn_json = json.dumps({"conn_type": "http", "port": 5432})
conn = Connection.from_json(conn_json, conn_id="test")
assert conn.port == 5432
5 changes: 5 additions & 0 deletions task-sdk/src/airflow/sdk/definitions/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Connection:
port: int | None = None
extra: str | None = None

@port.validator
def _validate_port(self, attribute, value):
if value is not None and not (1 <= value <= 65535):
raise ValueError(f"Port must be between 1 and 65535, got {value}")

EXTRA_KEY = "__extra__"

@overload
Expand Down
21 changes: 21 additions & 0 deletions task-sdk/tests/task_sdk/definitions/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,24 @@ def test_from_uri_roundtrip(self):
original_extra = json.loads(conn_from_original.extra)
roundtrip_extra = json.loads(conn_from_roundtrip.extra)
assert original_extra == roundtrip_extra


class TestConnectionPortValidation:
"""Tests for port range validation in Task SDK Connection (issue #68382)."""

@pytest.mark.parametrize("port", [1, 80, 443, 8080, 5432, 65535])
def test_accepts_valid_ports(self, port):
"""Valid TCP/UDP ports (1-65535) should be accepted."""
conn = Connection(conn_id="test", conn_type="http", port=port)
assert conn.port == port

def test_accepts_none_port(self):
"""None (no port) should be accepted."""
conn = Connection(conn_id="test", conn_type="http", port=None)
assert conn.port is None

@pytest.mark.parametrize("port", [0, -1, -100, 65536, 99999999])
def test_rejects_invalid_ports(self, port):
"""Ports outside range 1-65535 should be rejected."""
with pytest.raises(ValueError, match="Port must be between 1 and 65535"):
Connection(conn_id="test", conn_type="http", port=port)