Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions bugwarrior/collect.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from collections.abc import Iterator
from collections.abc import Iterable, Iterator
import copy
from functools import cache
from importlib.metadata import entry_points
import json
import logging
import multiprocessing
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, NamedTuple

from jinja2 import Template
from taskw.task import Task

from bugwarrior.config import get_service

if TYPE_CHECKING:
from bugwarrior.config.validation import Config
from bugwarrior.services import Issue, Service
Expand All @@ -21,24 +22,15 @@
SERVICE_FINISHED_ERROR = 1


@cache
def get_service(service_name: str) -> type["Service"]:
try:
(service,) = entry_points(group='bugwarrior.service', name=service_name)
except ValueError as e:
if service_name in [
'activecollab',
'activecollab2',
'megaplan',
'teamlab',
'versionone',
]:
log.warning(f"The {service_name} service has been removed.")
raise ValueError(
f"Configured service '{service_name}' not found. "
"Is it installed? Or misspelled?"
) from e
return service.load()
class CollectedIssue(NamedTuple):
task_data: dict[str, Any]
target: str
identifier: str


class CollectionErrorData(NamedTuple):
error_message: str
target: str


def get_service_instances(conf: "Config") -> list["Service"]:
Expand Down Expand Up @@ -81,7 +73,9 @@ def _aggregate_issues(service: "Service", queue: multiprocessing.Queue) -> None:
log.info(f"Done with [{target}] in {duration}.")


def aggregate_issues(conf: "Config", debug: bool) -> Iterator[dict | tuple[str, str]]:
def aggregate_issues(
conf: "Config", debug: bool
) -> Iterator[CollectedIssue | CollectionErrorData]:
"""Return all issues from every target."""
log.info("Starting to aggregate remote issues.")

Expand Down Expand Up @@ -111,22 +105,32 @@ def aggregate_issues(conf: "Config", debug: bool) -> Iterator[dict | tuple[str,
while currently_running > 0:
issue = queue.get(True)
try:
record = TaskConstructor(issue).get_taskwarrior_record()
record['target'] = issue.config.target
yield record
yield TaskConstructor(issue).get_data_to_sync()
except AttributeError:
if isinstance(issue, tuple):
currently_running -= 1
completion_type, target = issue
if completion_type == SERVICE_FINISHED_ERROR:
log.error(f"Aborted [{target}] due to critical error.")
yield ('SERVICE FAILED', target)
yield CollectionErrorData('SERVICE FAILED', target)
continue
raise

log.info("Done aggregating remote issues.")


def make_unique_identifier(
unique_keys: Iterable[str], task_data: dict[str, Any]
) -> str:
"""For a given issue, make an identifier from its unique keys.

This is not the same as the taskwarrior uuid, which is assigned
only once the task is created.
"""
subset = {key: task_data[key] for key in unique_keys}
return json.dumps(subset, sort_keys=True)


class TaskConstructor:
"""Construct a taskwarrior task from a foreign record."""

Expand All @@ -152,6 +156,10 @@ def get_taskwarrior_record(self, refined: bool = True) -> dict[str, Any]:
record['tags'] = []
if refined:
record['tags'].extend(self.get_added_tags())

# Blank priority should mean *no* priority
if record['priority'] == '':
record['priority'] = None
return record

def get_template_context(self) -> dict[str, Any]:
Expand All @@ -168,3 +176,11 @@ def refine_record(self, record: dict[str, Any]) -> dict[str, Any]:
elif field == 'description':
record['description'] = self.issue.get_default_description()
return record

def get_data_to_sync(self) -> CollectedIssue:
task_data = self.get_taskwarrior_record()
return CollectedIssue(
task_data=task_data,
identifier=make_unique_identifier(self.issue.UNIQUE_KEY, task_data),
target=self.issue.config.target,
)
4 changes: 2 additions & 2 deletions bugwarrior/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from lockfile import LockTimeout
from lockfile.pidlockfile import PIDLockFile

from bugwarrior.collect import aggregate_issues, get_service
from bugwarrior.config import get_config_path, get_keyring, load_config
from bugwarrior.collect import aggregate_issues
from bugwarrior.config import get_config_path, get_keyring, get_service, load_config
from bugwarrior.db import get_defined_udas_as_strings, synchronize

if TYPE_CHECKING:
Expand Down
1 change: 1 addition & 0 deletions bugwarrior/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
UnsupportedOption, # noqa: F401
)
from .secrets import get_keyring # noqa: F401
from .validation import get_service # noqa:F401

# NOTE: __all__ determines the stable, public API.
__all__ = [BugwarriorData.__name__, MainSectionConfig.__name__, ServiceConfig.__name__]
25 changes: 23 additions & 2 deletions bugwarrior/config/validation.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
from functools import cache
from importlib.metadata import entry_points
import logging
import sys
from typing import TYPE_CHECKING, Annotated, Any, NoReturn, Union

from pydantic import Field, TypeAdapter, ValidationError
from pydantic_core import ErrorDetails

from bugwarrior.collect import get_service

from .schema import BaseConfig, Hooks, MainSectionConfig, Notifications, ServiceConfig

if TYPE_CHECKING:
ServiceConfigType = ServiceConfig
from bugwarrior.services import Service


@cache
def get_service(service_name: str) -> type["Service"]:
try:
(service,) = entry_points(group='bugwarrior.service', name=service_name)
except ValueError as e:
if service_name in [
'activecollab',
'activecollab2',
'megaplan',
'teamlab',
'versionone',
]:
log.warning(f"The {service_name} service has been removed.")
raise ValueError(
f"Configured service '{service_name}' not found. "
"Is it installed? Or misspelled?"
) from e
return service.load()


log = logging.getLogger(__name__)
Expand Down
52 changes: 15 additions & 37 deletions bugwarrior/db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections.abc import Collection, Iterable, Iterator
import itertools
import json
import logging
import re
import subprocess
Expand All @@ -9,7 +8,8 @@
from taskw import TaskWarriorShellout
from taskw.exceptions import TaskwarriorError

from bugwarrior.collect import get_service
from bugwarrior.collect import CollectedIssue, CollectionErrorData
from bugwarrior.config import get_service
from bugwarrior.notifications import send_notification

if TYPE_CHECKING:
Expand Down Expand Up @@ -43,21 +43,6 @@ def get_managed_task_uuids(
return expected_task_ids


def make_unique_identifier(
unique_key_sets: Iterable[Collection[str]], issue: dict[str, Any]
) -> str:
"""For a given issue, make an identifier from its unique keys.

This is not the same as the taskwarrior uuid, which is assigned
only once the task is created.
"""
for unique_keys in unique_key_sets:
if all(key in issue for key in unique_keys):
subset = {key: issue[key] for key in unique_keys}
return json.dumps(subset, sort_keys=True)
raise RuntimeError("Could not determine unique identifier for %s" % issue)


def find_taskwarrior_uuid(
tw: TaskWarriorShellout,
unique_key_sets: Iterable[Collection[str]],
Expand Down Expand Up @@ -175,7 +160,7 @@ def run_hooks(pre_import: list[str]) -> None:


def synchronize(
issue_generator: Iterable[dict | tuple[str, str]],
issue_generator: Iterator[CollectedIssue | CollectionErrorData],
conf: "Config",
dry_run: bool = False,
) -> None:
Expand Down Expand Up @@ -207,27 +192,25 @@ def synchronize(
}

for issue in issue_generator:
if isinstance(issue, tuple):
assert issue[0] == 'SERVICE FAILED', (
"'issue' should only be a tuple in case of a failure"
)
successful_config_map.pop(issue[1])
if isinstance(issue, CollectionErrorData):
successful_config_map.pop(issue.target)
continue

# De-duplicate issues coming in
unique_identifier = make_unique_identifier(unique_key_sets, issue)
if unique_identifier in issue_map:
log.debug(f"Merging tags and skipping. Seen {unique_identifier} of {issue}")
if issue.identifier in issue_map:
log.debug(f"Merging tags and skipping. Seen {issue.identifier} of {issue}")
# Merge and deduplicate tags.
issue_map[unique_identifier]['tags'] += issue['tags']
issue_map[unique_identifier]['tags'] = list(
set(issue_map[unique_identifier]['tags'])
new_tags = sorted(
set(issue_map[issue.identifier].task_data['tags'])
| set(issue.task_data['tags'])
)
issue_map[issue.identifier].task_data['tags'] = new_tags

else:
issue_map[unique_identifier] = issue
issue_map[issue.identifier] = issue

seen_uuids = set()
for issue in issue_map.values():
for issue, target, _ in issue_map.values():
# We received this issue from The Internet, but we're not sure what
# kind of encoding the service providers may have handed us. Let's try
# and decode all byte strings from UTF8 off the bat. If we encounter
Expand All @@ -240,12 +223,7 @@ def synchronize(
except UnicodeDecodeError:
log.warning("Failed to interpret %r as utf-8" % key)

# Blank priority should mean *no* priority
if issue['priority'] == '':
issue['priority'] = None

# Target was only tacked on to pass configuration to this function.
service_config = successful_config_map[issue.pop('target')]
service_config = successful_config_map[target]

try:
existing_taskwarrior_uuid = find_taskwarrior_uuid(
Expand Down
Loading
Loading