[Notes] [Git][BuildGrid/buildgrid][finn/async] 2 commits: Update CONTRIBUTING.rst



Title: GitLab

finnball pushed to branch finn/async at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • CONTRIBUTING.rst
    ... ... @@ -9,8 +9,23 @@ We welcome contributions in the form of bug fixes or feature additions / enhance
    9 9
     
    
    10 10
     Any major feature additions should be raised as a proposal on the `Mailing List <https://lists.buildgrid.build/cgi-bin/mailman/listinfo/buildgrid/>`_ to be discussed, and then eventually followed up with an issue here on gitlab. We recommend that you propose the feature in advance of commencing work. We are also on irc, but do not have our own dedicated channel - you can find us on #buildstream on GIMPNet and #bazel on freenode.
    
    11 11
     
    
    12
    -The author of any patch is expected to take ownership of that code and is to support it for a reasonable
    
    13
    -time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced.
    
    12
    +The author of any patch is expected to take ownership of that code and is to support it for a reasonable time-frame. This means addressing any unforeseen side effects and quirks the feature may have introduced. More on this below in 'Granting Committer Access'.
    
    13
    +
    
    14
    +Granting Committer Access
    
    15
    +-------------------------
    
    16
    +
    
    17
    +We'll hand out commit access to anyone who has successfully landed a single patch to the code base. Please request this via irc or the mailing list.
    
    18
    +
    
    19
    +This of course relies on contributors being responsive and show willingness to address problems after landing branches there should not be any problems here.
    
    20
    +
    
    21
    +What we are expecting of committers here in general is basically to
    
    22
    +escalate the review in cases of uncertainty:
    
    23
    +
    
    24
    +* If the patch/branch is very trivial (obvious few line changes or typos etc), and you are confident of the change, there is no need for review.
    
    25
    +
    
    26
    +* If the patch/branch is non trivial, please obtain a review from another committer who is familiar with the area which the branch effects. An approval from someone who is not the patch author will be needed before any merge. 
    
    27
    +
    
    28
    +We don't have any detailed policy for "bad actors", but will of course handle things on a case by case basis - commit access should not result in commit wars or be used as a tool to subvert the project when disagreements arise, such incidents (if any) would surely lead to temporary suspension of commit rights.
    
    14 29
     
    
    15 30
     Patch Submissions
    
    16 31
     -----------------
    

  • buildgrid/server/execution/execution_instance.py
    ... ... @@ -34,12 +34,12 @@ class ExecutionInstance():
    34 34
             self.logger = logging.getLogger(__name__)
    
    35 35
             self._scheduler = scheduler
    
    36 36
     
    
    37
    -    def execute(self, action_digest, skip_cache_lookup):
    
    37
    +    def execute(self, action_digest, skip_cache_lookup, message_queue=None):
    
    38 38
             """ Sends a job for execution.
    
    39 39
             Queues an action and creates an Operation instance to be associated with
    
    40 40
             this action.
    
    41 41
             """
    
    42
    -        job = Job(action_digest)
    
    42
    +        job = Job(action_digest, message_queue)
    
    43 43
             self.logger.info("Operation name: {}".format(job.name))
    
    44 44
     
    
    45 45
             if not skip_cache_lookup:
    
    ... ... @@ -70,3 +70,15 @@ class ExecutionInstance():
    70 70
         def cancel_operation(self, name):
    
    71 71
             # TODO: Cancel leases
    
    72 72
             raise NotImplementedError("Cancelled operations not supported")
    
    73
    +
    
    74
    +    def register_message_client(self, name, queue):
    
    75
    +        try:
    
    76
    +            self._scheduler.register_client(name, queue)
    
    77
    +        except KeyError:
    
    78
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))
    
    79
    +
    
    80
    +    def unregister_message_client(self, name, queue):
    
    81
    +        try:
    
    82
    +            self._scheduler.unregister_client(name, queue)
    
    83
    +        except KeyError:
    
    84
    +            raise InvalidArgumentError("Operation name does not exist: {}".format(name))

  • buildgrid/server/execution/execution_service.py
    ... ... @@ -22,10 +22,9 @@ ExecutionService
    22 22
     Serves remote execution requests.
    
    23 23
     """
    
    24 24
     
    
    25
    -import copy
    
    26 25
     import grpc
    
    27 26
     import logging
    
    28
    -import time
    
    27
    +import queue
    
    29 28
     
    
    30 29
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remote_execution_pb2_grpc
    
    31 30
     from buildgrid._protos.google.longrunning import operations_pb2_grpc, operations_pb2
    
    ... ... @@ -35,17 +34,23 @@ from ._exceptions import InvalidArgumentError
    35 34
     class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    
    36 35
     
    
    37 36
         def __init__(self, instance):
    
    38
    -        self._instance = instance
    
    39 37
             self.logger = logging.getLogger(__name__)
    
    38
    +        self._instance = instance
    
    40 39
     
    
    41 40
         def Execute(self, request, context):
    
    42 41
             # Ignore request.instance_name for now
    
    43 42
             # Have only one instance
    
    44 43
             try:
    
    44
    +            message_queue = queue.Queue()
    
    45 45
                 operation = self._instance.execute(request.action_digest,
    
    46
    -                                               request.skip_cache_lookup)
    
    46
    +                                               request.skip_cache_lookup,
    
    47
    +                                               message_queue)
    
    47 48
     
    
    48
    -            yield from self._stream_operation_updates(operation.name)
    
    49
    +            remove_client = lambda : self._remove_client(operation.name, message_queue)
    
    50
    +            context.add_callback(remove_client)
    
    51
    +
    
    52
    +            yield from self._stream_operation_updates(message_queue,
    
    53
    +                                                      operation.name)
    
    49 54
     
    
    50 55
             except InvalidArgumentError as e:
    
    51 56
                 self.logger.error(e)
    
    ... ... @@ -59,19 +64,28 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    59 64
     
    
    60 65
         def WaitExecution(self, request, context):
    
    61 66
             try:
    
    62
    -            yield from self._stream_operation_updates(request.name)
    
    67
    +            message_queue = queue.Queue()
    
    68
    +            operation_name = request.name
    
    69
    +
    
    70
    +            self._instance.register_message_client(operation_name, message_queue)
    
    71
    +
    
    72
    +            remove_client = lambda : self._remove_client(operation_name, message_queue)
    
    73
    +            context.add_callback(remove_client)
    
    74
    +
    
    75
    +            yield from self._stream_operation_updates(message_queue,
    
    76
    +                                                      operation_name)
    
    63 77
     
    
    64 78
             except InvalidArgumentError as e:
    
    65 79
                 self.logger.error(e)
    
    66 80
                 context.set_details(str(e))
    
    67 81
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    68 82
     
    
    69
    -    def _stream_operation_updates(self, name):
    
    70
    -        stream_previous = None
    
    71
    -        while True:
    
    72
    -            stream = self._instance.get_operation(name)
    
    73
    -            if stream != stream_previous:
    
    74
    -                yield stream
    
    75
    -                if stream.done == True: break
    
    76
    -                stream_previous = copy.deepcopy(stream)
    
    77
    -            time.sleep(1)
    83
    +    def _remove_client(self, operation_name, message_queue):
    
    84
    +        self._instance.unregister_message_client(operation_name, message_queue)
    
    85
    +
    
    86
    +    def _stream_operation_updates(self, message_queue, operation_name):
    
    87
    +        operation = message_queue.get()
    
    88
    +        while not operation.done:
    
    89
    +            yield operation
    
    90
    +            operation = message_queue.get()
    
    91
    +        yield operation

  • buildgrid/server/job.py
    ... ... @@ -51,9 +51,8 @@ class LeaseState(Enum):
    51 51
     
    
    52 52
     class Job():
    
    53 53
     
    
    54
    -    def __init__(self, action):
    
    55
    -        self.action = action
    
    56
    -        self.bot_status = BotStatus.BOT_STATUS_UNSPECIFIED
    
    54
    +    def __init__(self, action_digest, message_queue=None):
    
    55
    +        self.action_digest = action_digest
    
    57 56
             self.execute_stage = ExecuteStage.UNKNOWN
    
    58 57
             self.lease = None
    
    59 58
             self.logger = logging.getLogger(__name__)
    
    ... ... @@ -62,10 +61,24 @@ class Job():
    62 61
     
    
    63 62
             self._n_tries = 0
    
    64 63
             self._operation = operations_pb2.Operation(name = self.name)
    
    64
    +        self._operation_update_queues = []
    
    65
    +
    
    66
    +        if message_queue is not None:
    
    67
    +            self.register_client(message_queue)
    
    68
    +
    
    69
    +    def check_job_finished(self):
    
    70
    +        if not self._operation_update_queues:
    
    71
    +            return self._operation.done
    
    72
    +        return False
    
    73
    +
    
    74
    +    def register_client(self, queue):
    
    75
    +        self._operation_update_queues.append(queue)
    
    76
    +
    
    77
    +    def unregister_client(self, queue):
    
    78
    +        self._operation_update_queues.remove(queue)
    
    65 79
     
    
    66 80
         def get_operation(self):
    
    67 81
             self._operation.metadata.CopyFrom(self._pack_any(self.get_operation_meta()))
    
    68
    -
    
    69 82
             if self.result is not None:
    
    70 83
                 self._operation.done = True
    
    71 84
                 response = ExecuteResponse()
    
    ... ... @@ -81,10 +94,10 @@ class Job():
    81 94
             return meta
    
    82 95
     
    
    83 96
         def create_lease(self):
    
    84
    -        action = self._pack_any(self.action)
    
    97
    +        action_digest = self._pack_any(self.action_digest)
    
    85 98
     
    
    86 99
             lease = bots_pb2.Lease(id = self.name,
    
    87
    -                               payload = action,
    
    100
    +                               payload = action_digest,
    
    88 101
                                    state = LeaseState.PENDING.value)
    
    89 102
             self.lease = lease
    
    90 103
             return lease
    
    ... ... @@ -92,6 +105,11 @@ class Job():
    92 105
         def get_operations(self):
    
    93 106
             return operations_pb2.ListOperationsResponse(operations = [self.get_operation()])
    
    94 107
     
    
    108
    +    def update_execute_stage(self, stage):
    
    109
    +        self.execute_stage = stage
    
    110
    +        for queue in self._operation_update_queues:
    
    111
    +            queue.put(self.get_operation())
    
    112
    +
    
    95 113
         def _pack_any(self, pack):
    
    96 114
             any = any_pb2.Any()
    
    97 115
             any.Pack(pack)
    

  • buildgrid/server/scheduler.py
    ... ... @@ -35,8 +35,17 @@ class Scheduler():
    35 35
             self.jobs = {}
    
    36 36
             self.queue = deque()
    
    37 37
     
    
    38
    +    def register_client(self, name, queue):
    
    39
    +        self.jobs[name].register_client(queue)
    
    40
    +
    
    41
    +    def unregister_client(self, name, queue):
    
    42
    +        job = self.jobs[name]
    
    43
    +        job.unregister_client(queue)
    
    44
    +        if job.check_job_finished():
    
    45
    +            del self.jobs[name]
    
    46
    +
    
    38 47
         def append_job(self, job):
    
    39
    -        job.execute_stage = ExecuteStage.QUEUED
    
    48
    +        job.update_execute_stage(ExecuteStage.QUEUED)
    
    40 49
             self.jobs[job.name] = job
    
    41 50
             self.queue.append(job)
    
    42 51
     
    
    ... ... @@ -45,9 +54,9 @@ class Scheduler():
    45 54
     
    
    46 55
             if job.n_tries >= self.MAX_N_TRIES:
    
    47 56
                 # TODO: Decide what to do with these jobs
    
    48
    -            job.execute_stage = ExecuteStage.COMPLETED
    
    57
    +            job.update_execute_stage(ExecuteStage.COMPLETED)
    
    49 58
             else:
    
    50
    -            job.execute_stage = ExecuteStage.QUEUED
    
    59
    +            job.update_execute_stage(ExecuteStage.QUEUED)
    
    51 60
                 job.n_tries += 1
    
    52 61
                 self.queue.appendleft(job)
    
    53 62
     
    
    ... ... @@ -56,15 +65,14 @@ class Scheduler():
    56 65
         def create_job(self):
    
    57 66
             if len(self.queue) > 0:
    
    58 67
                 job = self.queue.popleft()
    
    59
    -            job.execute_stage = ExecuteStage.EXECUTING
    
    68
    +            job.update_execute_stage(ExecuteStage.EXECUTING)
    
    60 69
                 self.jobs[job.name] = job
    
    61 70
                 return job
    
    62
    -        return None
    
    63 71
     
    
    64 72
         def job_complete(self, name, result):
    
    65 73
             job = self.jobs[name]
    
    66
    -        job.execute_stage = ExecuteStage.COMPLETED
    
    67 74
             job.result = result
    
    75
    +        job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 76
             self.jobs[name] = job
    
    69 77
     
    
    70 78
         def get_operations(self):
    
    ... ... @@ -122,3 +130,7 @@ class Scheduler():
    122 130
             if state == LeaseState.PENDING.value or \
    
    123 131
                state == LeaseState.ACTIVE.value:
    
    124 132
                 self.retry_job(name)
    
    133
    +
    
    134
    +    def _update_execute_stage(self, job, stage):
    
    135
    +        job.update_execute_stage(stage)
    
    136
    +        return job

  • tests/integration/execution_service.py
    ... ... @@ -69,17 +69,22 @@ def test_execute(skip_cache_lookup, instance, context):
    69 69
             assert result.done is False
    
    70 70
     
    
    71 71
     def test_wait_execution(instance, context):
    
    72
    +    # TODO: Figure out why next(response) hangs on the .get()
    
    73
    +    # method when running in pytest.
    
    72 74
         action_digest = remote_execution_pb2.Digest()
    
    73 75
         action_digest.hash = 'zhora'
    
    74 76
     
    
    75
    -    execution_request = remote_execution_pb2.ExecuteRequest(instance_name = '',
    
    76
    -                                                            action_digest = action_digest,
    
    77
    -                                                            skip_cache_lookup = True)
    
    78
    -    execution_response = next(instance.Execute(execution_request, context))
    
    77
    +    j = job.Job(action_digest, None)
    
    78
    +    j._operation.done = True
    
    79 79
     
    
    80
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    80 81
     
    
    81
    -    request = remote_execution_pb2.WaitExecutionRequest(name=execution_response.name)
    
    82
    +    instance._instance._scheduler.jobs[j.name] = j
    
    82 83
     
    
    83
    -    response = next(instance.WaitExecution(request, context))
    
    84
    +    action_result_any = any_pb2.Any()
    
    85
    +    action_result = remote_execution_pb2.ActionResult()
    
    86
    +    action_result_any.Pack(action_result)
    
    84 87
     
    
    85
    -    assert response == execution_response
    88
    +    instance._instance._scheduler._update_execute_stage(j, job.ExecuteStage.COMPLETED)
    
    89
    +
    
    90
    +    response = instance.WaitExecution(request, context)



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]