diff --git a/airflow-core/src/airflow/utils/log/file_processor_handler.py b/airflow-core/src/airflow/utils/log/file_processor_handler.py index 2ef63d55c9c97..82c671feddb8d 100644 --- a/airflow-core/src/airflow/utils/log/file_processor_handler.py +++ b/airflow-core/src/airflow/utils/log/file_processor_handler.py @@ -46,7 +46,7 @@ def __init__(self, base_log_folder, filename_template): super().__init__() self.handler = None self.base_log_folder = base_log_folder - self.dag_dir = os.path.expanduser(settings.DAGS_FOLDER) + self.dag_dir = str(Path(settings.DAGS_FOLDER).expanduser()) self.filename_template, self.filename_jinja_template = parse_template_string(filename_template) self._cur_date = datetime.today() @@ -94,9 +94,9 @@ def _render_filename(self, filename): airflow_directory = airflow.__path__[0] if filename.startswith(airflow_directory): - filename = os.path.join("native_dags", os.path.relpath(filename, airflow_directory)) + filename = str(Path("native_dags") / Path(filename).relative_to(airflow_directory)) else: - filename = os.path.relpath(filename, self.dag_dir) + filename = str(Path(filename).relative_to(self.dag_dir)) ctx = {"filename": filename} if self.filename_jinja_template: @@ -105,7 +105,7 @@ def _render_filename(self, filename): return self.filename_template.format(filename=ctx["filename"]) def _get_log_directory(self): - return os.path.join(self.base_log_folder, timezone.utcnow().strftime("%Y-%m-%d")) + return str(Path(self.base_log_folder) / timezone.utcnow().strftime("%Y-%m-%d")) def _symlink_latest_log_directory(self): """ @@ -115,17 +115,17 @@ def _symlink_latest_log_directory(self): :return: None """ - log_directory = self._get_log_directory() - latest_log_directory_path = os.path.join(self.base_log_folder, "latest") - if os.path.isdir(log_directory): - rel_link_target = Path(log_directory).relative_to(Path(latest_log_directory_path).parent) + log_directory = Path(self._get_log_directory()) + latest_log_directory_path = Path(self.base_log_folder) / "latest" + if log_directory.is_dir(): + rel_link_target = log_directory.relative_to(latest_log_directory_path.parent) try: # if symlink exists but is stale, update it - if os.path.islink(latest_log_directory_path): - if os.path.realpath(latest_log_directory_path) != log_directory: + if latest_log_directory_path.is_symlink(): + if latest_log_directory_path.resolve() != log_directory.resolve(): os.unlink(latest_log_directory_path) os.symlink(rel_link_target, latest_log_directory_path) - elif os.path.isdir(latest_log_directory_path) or os.path.isfile(latest_log_directory_path): + elif latest_log_directory_path.is_dir() or latest_log_directory_path.is_file(): logger.warning( "%s already exists as a dir/file. Skip creating symlink.", latest_log_directory_path ) @@ -141,13 +141,13 @@ def _init_file(self, filename): :param filename: task instance object :return: relative log path of the given task instance """ - relative_log_file_path = os.path.join(self._get_log_directory(), self._render_filename(filename)) - log_file_path = os.path.abspath(relative_log_file_path) - directory = os.path.dirname(log_file_path) + relative_log_file_path = Path(self._get_log_directory()) / self._render_filename(filename) + log_file_path = relative_log_file_path.absolute() + directory = log_file_path.parent - Path(directory).mkdir(parents=True, exist_ok=True) + directory.mkdir(parents=True, exist_ok=True) - if not os.path.exists(log_file_path): + if not log_file_path.exists(): open(log_file_path, "a").close() - return log_file_path + return str(log_file_path)