[Notes] [Git][BuildGrid/buildgrid][master] 2 commits: Add instances to service-level operation names



Title: GitLab

Rohit Kothur pushed to branch master at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/server/execution/instance.py
    ... ... @@ -52,8 +52,6 @@ class ExecutionInstance:
    52 52
             if message_queue is not None:
    
    53 53
                 job.register_client(message_queue)
    
    54 54
     
    
    55
    -        self.logger.info("Operation name: [{}]".format(job.name))
    
    56
    -
    
    57 55
             self._scheduler.queue_job(job, skip_cache_lookup)
    
    58 56
     
    
    59 57
             return job.operation
    

  • buildgrid/server/execution/service.py
    ... ... @@ -52,8 +52,17 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    52 52
                 context.add_callback(partial(instance.unregister_message_client,
    
    53 53
                                              operation.name, message_queue))
    
    54 54
     
    
    55
    -            yield from instance.stream_operation_updates(message_queue,
    
    56
    -                                                         operation.name)
    
    55
    +            instanced_op_name = "{}/{}".format(request.instance_name,
    
    56
    +                                               operation.name)
    
    57
    +
    
    58
    +            self.logger.info("Operation name: [{}]".format(instanced_op_name))
    
    59
    +
    
    60
    +            for operation in instance.stream_operation_updates(message_queue,
    
    61
    +                                                               operation.name):
    
    62
    +                op = operations_pb2.Operation()
    
    63
    +                op.CopyFrom(operation)
    
    64
    +                op.name = instanced_op_name
    
    65
    +                yield op
    
    57 66
     
    
    58 67
             except InvalidArgumentError as e:
    
    59 68
                 self.logger.error(e)
    
    ... ... @@ -84,8 +93,12 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    84 93
                 context.add_callback(partial(instance.unregister_message_client,
    
    85 94
                                              operation_name, message_queue))
    
    86 95
     
    
    87
    -            yield from instance.stream_operation_updates(message_queue,
    
    88
    -                                                         operation_name)
    
    96
    +            for operation in instance.stream_operation_updates(message_queue,
    
    97
    +                                                               operation_name):
    
    98
    +                op = operations_pb2.Operation()
    
    99
    +                op.CopyFrom(operation)
    
    100
    +                op.name = request.name
    
    101
    +                yield op
    
    89 102
     
    
    90 103
             except InvalidArgumentError as e:
    
    91 104
                 self.logger.error(e)
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -47,7 +47,13 @@ class OperationsInstance:
    47 47
             # TODO: Pages
    
    48 48
             # Spec says number of pages and length of a page are optional
    
    49 49
             response = operations_pb2.ListOperationsResponse()
    
    50
    -        response.operations.extend([job.operation for job in self._scheduler.list_jobs()])
    
    50
    +        operations = []
    
    51
    +        for job in self._scheduler.list_jobs():
    
    52
    +            op = operations_pb2.Operation()
    
    53
    +            op.CopyFrom(job.operation)
    
    54
    +            operations.append(op)
    
    55
    +
    
    56
    +        response.operations.extend(operations)
    
    51 57
     
    
    52 58
             return response
    
    53 59
     
    

  • buildgrid/server/operations/service.py
    ... ... @@ -44,13 +44,16 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    44 44
         def GetOperation(self, request, context):
    
    45 45
             try:
    
    46 46
                 name = request.name
    
    47
    -            operation_name = self._get_operation_name(name)
    
    48 47
     
    
    49
    -            instance = self._get_instance(name)
    
    48
    +            instance_name = self._parse_instance_name(name)
    
    49
    +            instance = self._get_instance(instance_name)
    
    50 50
     
    
    51
    +            operation_name = self._parse_operation_name(name)
    
    51 52
                 operation = instance.get_operation(operation_name)
    
    52
    -            operation.name = name
    
    53
    -            return operation
    
    53
    +            op = operations_pb2.Operation()
    
    54
    +            op.CopyFrom(operation)
    
    55
    +            op.name = name
    
    56
    +            return op
    
    54 57
     
    
    55 58
             except InvalidArgumentError as e:
    
    56 59
                 self.logger.error(e)
    
    ... ... @@ -61,17 +64,17 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    61 64
     
    
    62 65
         def ListOperations(self, request, context):
    
    63 66
             try:
    
    64
    -            # Name should be the collection name
    
    65
    -            # Or in this case, the instance_name
    
    66
    -            name = request.name
    
    67
    -            instance = self._get_instance(name)
    
    67
    +            # The request name should be the collection name
    
    68
    +            # In our case, this is just the instance_name
    
    69
    +            instance_name = request.name
    
    70
    +            instance = self._get_instance(instance_name)
    
    68 71
     
    
    69 72
                 result = instance.list_operations(request.filter,
    
    70 73
                                                   request.page_size,
    
    71 74
                                                   request.page_token)
    
    72 75
     
    
    73 76
                 for operation in result.operations:
    
    74
    -                operation.name = "{}/{}".format(name, operation.name)
    
    77
    +                operation.name = "{}/{}".format(instance_name, operation.name)
    
    75 78
     
    
    76 79
                 return result
    
    77 80
     
    
    ... ... @@ -85,10 +88,11 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    85 88
         def DeleteOperation(self, request, context):
    
    86 89
             try:
    
    87 90
                 name = request.name
    
    88
    -            operation_name = self._get_operation_name(name)
    
    89 91
     
    
    90
    -            instance = self._get_instance(name)
    
    92
    +            instance_name = self._parse_instance_name(name)
    
    93
    +            instance = self._get_instance(instance_name)
    
    91 94
     
    
    95
    +            operation_name = self._parse_operation_name(name)
    
    92 96
                 instance.delete_operation(operation_name)
    
    93 97
     
    
    94 98
             except InvalidArgumentError as e:
    
    ... ... @@ -101,10 +105,11 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    101 105
         def CancelOperation(self, request, context):
    
    102 106
             try:
    
    103 107
                 name = request.name
    
    104
    -            operation_name = self._get_operation_name(name)
    
    105 108
     
    
    106
    -            instance = self._get_instance(name)
    
    109
    +            instance_name = self._parse_instance_name(name)
    
    110
    +            instance = self._get_instance(instance_name)
    
    107 111
     
    
    112
    +            operation_name = self._parse_operation_name(name)
    
    108 113
                 instance.cancel_operation(operation_name)
    
    109 114
     
    
    110 115
             except NotImplementedError as e:
    
    ... ... @@ -119,20 +124,20 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    119 124
     
    
    120 125
             return Empty()
    
    121 126
     
    
    122
    -    def _get_operation_name(self, name):
    
    123
    -        return name.split("/")[-1]
    
    127
    +    def _parse_instance_name(self, name):
    
    128
    +        """ If the instance name is not blank, 'name' will have the form
    
    129
    +        {instance_name}/{operation_uuid}. Otherwise, it will just be
    
    130
    +        {operation_uuid} """
    
    131
    +        names = name.split('/')
    
    132
    +        return '/'.join(names[:-1]) if len(names) > 1 else ''
    
    133
    +
    
    134
    +    def _parse_operation_name(self, name):
    
    135
    +        names = name.split('/')
    
    136
    +        return names[-1] if len(names) > 1 else name
    
    124 137
     
    
    125 138
         def _get_instance(self, name):
    
    126 139
             try:
    
    127
    -            names = name.split("/")
    
    128
    -
    
    129
    -            # Operation name should be in format:
    
    130
    -            # {instance/name}/{operation_id}
    
    131
    -            instance_name = ''.join(names[0:-1])
    
    132
    -            if not instance_name:
    
    133
    -                return self._instances[name]
    
    134
    -
    
    135
    -            return self._instances[instance_name]
    
    140
    +            return self._instances[name]
    
    136 141
     
    
    137 142
             except KeyError:
    
    138 143
                 raise InvalidArgumentError("Instance doesn't exist on server: [{}]".format(name))

  • tests/integration/execution_service.py
    ... ... @@ -83,7 +83,8 @@ def test_execute(skip_cache_lookup, instance, context):
    83 83
         metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    84 84
         result.metadata.Unpack(metadata)
    
    85 85
         assert metadata.stage == job.OperationStage.QUEUED.value
    
    86
    -    assert uuid.UUID(result.name, version=4)
    
    86
    +    operation_uuid = result.name.split('/')[-1]
    
    87
    +    assert uuid.UUID(operation_uuid, version=4)
    
    87 88
         assert result.done is False
    
    88 89
     
    
    89 90
     
    
    ... ... @@ -108,7 +109,7 @@ def test_wait_execution(instance, controller, context):
    108 109
         j = job.Job(action, action_digest)
    
    109 110
         j._operation.done = True
    
    110 111
     
    
    111
    -    request = remote_execution_pb2.WaitExecutionRequest(name="{}/{}".format('', j.name))
    
    112
    +    request = remote_execution_pb2.WaitExecutionRequest(name=j.name)
    
    112 113
     
    
    113 114
         controller.execution_instance._scheduler.jobs[j.name] = j
    
    114 115
     
    

  • tests/integration/operations_service.py
    ... ... @@ -75,6 +75,16 @@ def instance(controller):
    75 75
             yield operation_service
    
    76 76
     
    
    77 77
     
    
    78
    +# Blank instance
    
    79
    +@pytest.fixture
    
    80
    +def blank_instance(controller):
    
    81
    +    with mock.patch.object(service, 'operations_pb2_grpc'):
    
    82
    +        operation_service = OperationsService(server)
    
    83
    +        operation_service.add_instance('', controller.operations_instance)
    
    84
    +
    
    85
    +        yield operation_service
    
    86
    +
    
    87
    +
    
    78 88
     # Queue an execution, get operation corresponding to that request
    
    79 89
     def test_get_operation(instance, controller, execute_request, context):
    
    80 90
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    ... ... @@ -82,14 +92,34 @@ def test_get_operation(instance, controller, execute_request, context):
    82 92
     
    
    83 93
         request = operations_pb2.GetOperationRequest()
    
    84 94
     
    
    95
    +    # The execution instance name is normally set in add_instance, but since
    
    96
    +    # we're manually creating the instance here, it doesn't get a name.
    
    97
    +    # Therefore we need to manually add the instance name to the operation
    
    98
    +    # name in the GetOperation request.
    
    85 99
         request.name = "{}/{}".format(instance_name, response_execute.name)
    
    86 100
     
    
    87 101
         response = instance.GetOperation(request, context)
    
    88
    -    assert response is response_execute
    
    102
    +    assert response.name == "{}/{}".format(instance_name, response_execute.name)
    
    103
    +    assert response.done == response_execute.done
    
    104
    +
    
    105
    +
    
    106
    +# Queue an execution, get operation corresponding to that request
    
    107
    +def test_get_operation_blank(blank_instance, controller, execute_request, context):
    
    108
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    109
    +                                                             execute_request.skip_cache_lookup)
    
    110
    +
    
    111
    +    request = operations_pb2.GetOperationRequest()
    
    112
    +
    
    113
    +    request.name = response_execute.name
    
    114
    +
    
    115
    +    response = blank_instance.GetOperation(request, context)
    
    116
    +    assert response.name == response_execute.name
    
    117
    +    assert response.done == response_execute.done
    
    89 118
     
    
    90 119
     
    
    91 120
     def test_get_operation_fail(instance, context):
    
    92 121
         request = operations_pb2.GetOperationRequest()
    
    122
    +
    
    93 123
         request.name = "{}/{}".format(instance_name, "runner")
    
    94 124
         instance.GetOperation(request, context)
    
    95 125
     
    
    ... ... @@ -110,6 +140,18 @@ def test_list_operations(instance, controller, execute_request, context):
    110 140
         request = operations_pb2.ListOperationsRequest(name=instance_name)
    
    111 141
         response = instance.ListOperations(request, context)
    
    112 142
     
    
    143
    +    names = response.operations[0].name.split('/')
    
    144
    +    assert names[0] == instance_name
    
    145
    +    assert names[1] == response_execute.name
    
    146
    +
    
    147
    +
    
    148
    +def test_list_operations_blank(blank_instance, controller, execute_request, context):
    
    149
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    150
    +                                                             execute_request.skip_cache_lookup)
    
    151
    +
    
    152
    +    request = operations_pb2.ListOperationsRequest(name='')
    
    153
    +    response = blank_instance.ListOperations(request, context)
    
    154
    +
    
    113 155
         assert response.operations[0].name.split('/')[-1] == response_execute.name
    
    114 156
     
    
    115 157
     
    
    ... ... @@ -160,15 +202,30 @@ def test_list_operations_empty(instance, context):
    160 202
     def test_delete_operation(instance, controller, execute_request, context):
    
    161 203
         response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    162 204
                                                                  execute_request.skip_cache_lookup)
    
    205
    +
    
    163 206
         request = operations_pb2.DeleteOperationRequest()
    
    164
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    207
    +    request.name = response_execute.name
    
    165 208
         instance.DeleteOperation(request, context)
    
    166 209
     
    
    167
    -    request = operations_pb2.GetOperationRequest()
    
    168
    -    request.name = "{}/{}".format(instance_name, response_execute.name)
    
    210
    +    request_name = "{}/{}".format(instance_name, response_execute.name)
    
    169 211
     
    
    170 212
         with pytest.raises(InvalidArgumentError):
    
    171
    -        controller.operations_instance.get_operation(response_execute.name)
    
    213
    +        controller.operations_instance.get_operation(request_name)
    
    214
    +
    
    215
    +
    
    216
    +# Send execution off, delete, try to find operation should fail
    
    217
    +def test_delete_operation_blank(blank_instance, controller, execute_request, context):
    
    218
    +    response_execute = controller.execution_instance.execute(execute_request.action_digest,
    
    219
    +                                                             execute_request.skip_cache_lookup)
    
    220
    +
    
    221
    +    request = operations_pb2.DeleteOperationRequest()
    
    222
    +    request.name = response_execute.name
    
    223
    +    blank_instance.DeleteOperation(request, context)
    
    224
    +
    
    225
    +    request_name = response_execute.name
    
    226
    +
    
    227
    +    with pytest.raises(InvalidArgumentError):
    
    228
    +        controller.operations_instance.get_operation(request_name)
    
    172 229
     
    
    173 230
     
    
    174 231
     def test_delete_operation_fail(instance, context):
    
    ... ... @@ -187,6 +244,14 @@ def test_cancel_operation(instance, context):
    187 244
         context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    188 245
     
    
    189 246
     
    
    247
    +def test_cancel_operation_blank(blank_instance, context):
    
    248
    +    request = operations_pb2.CancelOperationRequest()
    
    249
    +    request.name = "runner"
    
    250
    +    blank_instance.CancelOperation(request, context)
    
    251
    +
    
    252
    +    context.set_code.assert_called_once_with(grpc.StatusCode.UNIMPLEMENTED)
    
    253
    +
    
    254
    +
    
    190 255
     def test_cancel_operation_instance_fail(instance, context):
    
    191 256
         request = operations_pb2.CancelOperationRequest()
    
    192 257
         instance.CancelOperation(request, context)
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]