[Git][BuildStream/buildstream][135-expire-artifacts-in-local-cache] _context.py: Add simpler message handlers



Hi,

 

please check below what can be sent to the mailing list by default by Gitlab. The interesting options available are:

 

* To include or not the diffs

* Include more than one recipient. In this case we are talking about the mailing list.

* More information about the feature: https://docs.gitlab.com/ee/user/project/integrations/emails_on_push.html

 

As you can see, these mails can be easily filtered since the Subject start by

 

[Git][Group/project]

 

Please let me know if this is what you want to activate it.

 

Further functionality requires the usage of Webhooks. I would need the help of an engineer for doing more complex/customized stuff.

 

######

 

 

 

> Tristan Maat pushed to branch 135-expire-artifacts-in-local-cache at

> BuildStream / buildstream

>

>

> Commits:

> a6a7a01f by Tristan Maat at 2018-07-10T14:25:44Z

> _context.py: Add simpler message handlers

>

> - - - - -

>

>

> 13 changed files:

>

> - buildstream/_artifactcache/artifactcache.py

> - buildstream/_context.py

> - buildstream/_scheduler/jobs/cachesizejob.py

> - buildstream/_scheduler/jobs/cleanupjob.py

> - buildstream/_scheduler/jobs/job.py

> - buildstream/_scheduler/queues/buildqueue.py

> - buildstream/_scheduler/queues/fetchqueue.py

> - buildstream/_scheduler/queues/pullqueue.py

> - buildstream/_scheduler/queues/pushqueue.py

> - buildstream/_scheduler/queues/queue.py

> - buildstream/_scheduler/queues/trackqueue.py

> - buildstream/_scheduler/resources.py

> - buildstream/_scheduler/scheduler.py

>

>

> Changes:

>

> =====================================

> buildstream/_artifactcache/artifactcache.py

> =====================================

> @@ -217,8 +217,10 @@ class ArtifactCache():

> default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],

> 'buildstream.conf')

> detail = ("There is not enough space to build the given

> element.\n" - "Please increase the cache-quota in

> {}." - .format(self.context.config_origin or

> default_conf)) + "Please increase the cache-quota

> in {}.\n" + "Space usage: {:,} B/{:,} B"

> + .format(self.context.config_origin or

> default_conf, +

> self.calculate_cache_size(), self.context.cache_quota))

>

> if self.calculate_cache_size() > self.context.cache_quota:

> raise ArtifactError("Cache too full. Aborting.",

>

>

> =====================================

> buildstream/_context.py

> =====================================

> @@ -393,6 +393,30 @@ class Context():

> self._message_handler(message, context=self)

> return

>

> + def msg(self, text, *, plugin=None, msg_type=None, **kwargs):

> + self.message(Message(plugin, msg_type, str(text), **kwargs))

> +

> + def debug(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.DEBUG, **kwargs)

> +

> + def status(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.STATUS,

> **kwargs) +

> + def info(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.INFO, **kwargs)

> +

> + def warn(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.WARN, **kwargs)

> +

> + def error(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.ERROR, **kwargs)

> +

> + def bug(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.BUG, **kwargs)

> +

> + def log(self, text, *, plugin=None, **kwargs):

> + self.msg(text, plugin=plugin, msg_type=MessageType.LOG, **kwargs)

> +

> # silence()

> #

> # A context manager to silence messages, this behaves in

>

>

> =====================================

> buildstream/_scheduler/jobs/cachesizejob.py

> =====================================

> @@ -35,7 +35,8 @@ class CacheSizeJob(Job):

>

> def _parent_complete(self, success, result):

> self._cache._set_cache_size(result)

> - self._complete_cb(result)

> + if self._complete_cb:

> + self._complete_cb(result)

>

> @contextmanager

> def _child_logging_enabled(self, logfile):

>

>

> =====================================

> buildstream/_scheduler/jobs/cleanupjob.py

> =====================================

> @@ -35,7 +35,8 @@ class CleanupJob(Job):

>

> def _parent_complete(self, success, result):

> self._cache._set_cache_size(result)

> - self._complete_cb()

> + if self._complete_cb:

> + self._complete_cb()

>

> @contextmanager

> def _child_logging_enabled(self, logfile):

>

>

> =====================================

> buildstream/_scheduler/jobs/job.py

> =====================================

> @@ -77,17 +77,16 @@ class Job():

> resources=None, exclusive_resources=None, max_retries=0):

>

> if resources is None:

> - resources = []

> + resources = set()

> else:

> resources = set(resources)

> if exclusive_resources is None:

> - exclusive_resources = []

> + exclusive_resources = set()

> else:

> - resources = set(resources)

> + exclusive_resources = set(resources)

>

> # Ensure nobody tries not use an exclusive resource.

> - assert(exclusive_resources <= resources,

> - "All exclusive resources must also be resources!")

> + assert exclusive_resources <= resources, "All exclusive resources

> must also be resources!"

>

> #

> # Public members

>

>

> =====================================

> buildstream/_scheduler/queues/buildqueue.py

> =====================================

> @@ -18,10 +18,8 @@

> # Tristan Van Berkom <tristan vanberkom codethink co uk>

> # Jürg Billeter <juerg billeter codethink co uk>

>

> -import os

> from . import Queue, QueueStatus

> -from .. import ResourceType

> -from ..jobs import CacheSizeJob, CleanupJob

> +from ..resources import ResourceType

>

>

> # A queue which assembles elements

> @@ -30,7 +28,7 @@ class BuildQueue(Queue):

>

> action_name = "Build"

> complete_name = "Built"

> - queue_type = ResourceType.PROCESS

> + resources = [ResourceType.PROCESS]

>

> def process(self, element):

> element._assemble()

> @@ -53,29 +51,6 @@ class BuildQueue(Queue):

>

> return QueueStatus.READY

>

> - def _run_cleanup_if_required(self, cache_size):

> - if cache_size < self._scheduler.context.cache_quota:

> - return

> -

> - logpath = os.path.join(self._scheduler.context.logdir,

> 'cleanup.{pid}.log') - cleanup_job = CleanupJob(self._scheduler,

> - 'clean', logpath,

> - resources=[ResourceType.CACHE,

> - ResourceType.PROCESS],

> - exclusive_resources=[ResourceType.CACHE],

> - complete_cb=lambda: 1)

> - self._scheduler.schedule_jobs([cleanup_job])

> -

> - def _check_real_cache_size(self):

> - logpath = os.path.join(self._scheduler.context.logdir,

> 'cache_size.{pid}.log') - cache_size_job =

> CacheSizeJob(self._scheduler,

> - 'cache_size', logpath,

> - resources=[ResourceType.CACHE,

> - ResourceType.PROCESS],

> -

> exclusive_resources=[ResourceType.CACHE], -

> complete_cb=self._run_cleanup_if_required) -

> self._scheduler.schedule_jobs([cache_size_job])

> -

> def _check_cache_size(self, job, element):

> if not job.child_data:

> return

> @@ -87,7 +62,7 @@ class BuildQueue(Queue):

> cache._add_artifact_size(artifact_size)

>

> if cache.get_approximate_cache_size() >

> self._scheduler.context.cache_quota: -

> self._check_real_cache_size()

> + self._scheduler._check_cache_size_real()

>

> def done(self, job, element, result, success):

>

>

>

> =====================================

> buildstream/_scheduler/queues/fetchqueue.py

> =====================================

> @@ -23,7 +23,7 @@ from ... import Consistency

>

> # Local imports

> from . import Queue, QueueStatus

> -from .. import ResourceType

> +from ..resources import ResourceType

>

>

> # A queue which fetches element sources

> @@ -32,7 +32,7 @@ class FetchQueue(Queue):

>

> action_name = "Fetch"

> complete_name = "Fetched"

> - queue_type = ResourceType.DOWNLOAD

> + resources = [ResourceType.DOWNLOAD]

>

> def __init__(self, scheduler, skip_cached=False):

> super().__init__(scheduler)

>

>

> =====================================

> buildstream/_scheduler/queues/pullqueue.py

> =====================================

> @@ -20,7 +20,7 @@

>

> # Local imports

> from . import Queue, QueueStatus

> -from .. import ResourceType

> +from ..resources import ResourceType

>

>

> # A queue which pulls element artifacts

> @@ -29,7 +29,7 @@ class PullQueue(Queue):

>

> action_name = "Pull"

> complete_name = "Pulled"

> - queue_type = ResourceType.UPLOAD

> + resources = [ResourceType.UPLOAD]

>

> def process(self, element):

> # returns whether an artifact was downloaded or not

>

>

> =====================================

> buildstream/_scheduler/queues/pushqueue.py

> =====================================

> @@ -29,7 +29,7 @@ class PushQueue(Queue):

>

> action_name = "Push"

> complete_name = "Pushed"

> - queue_type = ResourceType.UPLOAD

> + resources = [ResourceType.UPLOAD]

>

> def process(self, element):

> # returns whether an artifact was uploaded or not

>

>

> =====================================

> buildstream/_scheduler/queues/queue.py

> =====================================

> @@ -75,13 +75,13 @@ class Queue():

> self._wait_queue = deque()

> self._done_queue = deque()

> self._max_retries = 0

> - if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD

> in self.resources: - self._max_retries =

> scheduler.context.sched_network_retries

>

> # Assert the subclass has setup class data

> assert self.action_name is not None

> assert self.complete_name is not None

> - assert self.resources is not []

> +

> + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD

> in self.resources: + self._max_retries =

> scheduler.context.sched_network_retries

>

> #####################################################

> # Abstract Methods for Queue implementations #

> @@ -248,14 +248,11 @@ class Queue():

> return ready

>

> def peek_ready_jobs(self):

> - scheduler = self._scheduler

> -

> def ready(job):

> return self.status(job.element) == QueueStatus.READY

>

> yield from (job for job in self._wait_queue if ready(job))

>

> -

> #####################################################

> # Private Methods #

> #####################################################

>

>

> =====================================

> buildstream/_scheduler/queues/trackqueue.py

> =====================================

> @@ -33,7 +33,7 @@ class TrackQueue(Queue):

>

> action_name = "Track"

> complete_name = "Tracked"

> - queue_type = ResourceType.DOWNLOAD

> + resources = [ResourceType.DOWNLOAD]

>

> def process(self, element):

> return element._track()

>

>

> =====================================

> buildstream/_scheduler/resources.py

> =====================================

> @@ -34,7 +34,16 @@ class Resources():

> ResourceType.UPLOAD: set()

> }

>

> - def reserve_job_resources(self, job):

> + def clear_job_resources(self, job):

> + for resource in job.exclusive_resources:

> + self._exclusive_resources[resource].remove(hash(job))

> +

> + for resource in job.resources:

> + self._used_resources[resource] -= 1

> +

> + def reserve_exclusive_resources(self, job):

> + exclusive = job.exclusive_resources

> +

> # The very first thing we do is to register any exclusive

> # resources this job may want. Even if the job is not yet

> # allowed to run (because another job is holding the resource

> @@ -43,9 +52,10 @@ class Resources():

> # and no new jobs wanting these can be launched (except other

> # exclusive-access jobs).

> #

> - for resource in job.exclusive_resources:

> + for resource in exclusive:

> self._exclusive_resources[resource].add(hash(job))

>

> + def reserve_job_resources(self, job):

> # First, we check if the job wants to access a resource that

> # another job wants exclusive access to. If so, it cannot be

> # scheduled.

> @@ -81,8 +91,10 @@ class Resources():

> # Finally, we check if we have enough of each resource

> # available. If we don't have enough, the job cannot be

> # scheduled.

> - if self._max_resources[resource] >= self._used_resources[resource]:

> - return False

> + for resource in job.resources:

> + if (self._max_resources[resource] > 0 and

> + self._used_resources[resource] >=

> self._max_resources[resource]): + return False

>

> # Now we register the fact that our job is using the resources

> # it asked for, and tell the scheduler that it is allowed to

>

>

> =====================================

> buildstream/_scheduler/scheduler.py

> =====================================

> @@ -27,7 +27,8 @@ import datetime

> from contextlib import contextmanager

>

> # Local imports

> -from .resources import Resources

> +from .resources import Resources, ResourceType

> +from .jobs import CacheSizeJob, CleanupJob

>

>

> # A decent return code for Scheduler.run()

> @@ -231,7 +232,9 @@ class Scheduler():

> # success (bool): Whether the Job completed with a success status

> #

> def job_completed(self, job, success):

> + self._resources.clear_job_resources(job)

> self.active_jobs.remove(job)

> + self._job_complete_callback(job, success)

> self._schedule_queue_jobs()

> self._sched()

>

> @@ -245,6 +248,9 @@ class Scheduler():

> # automatically when Scheduler.run() is called initially,

> #

> def _sched(self):

> + for job in self.waiting_jobs:

> + self._resources.reserve_exclusive_resources(job)

> +

> for job in self.waiting_jobs:

> if not self._resources.reserve_job_resources(job):

> continue

> @@ -257,7 +263,7 @@ class Scheduler():

> self._job_start_callback(job)

>

> # If nothings ticking, time to bail out

> - if not self.active_jobs:

> + if not self.active_jobs and not self.waiting_jobs:

> self.loop.stop()

>

> # _schedule_queue_jobs()

> @@ -306,6 +312,33 @@ class Scheduler():

> self.schedule_jobs(ready)

> self._sched()

>

> + def _run_cleanup(self, cache_size):

> + self.context.info(cache_size)

> +

> + if cache_size and cache_size < self.context.cache_quota:

> + return

> +

> + logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')

> + job = CleanupJob(self, 'cleanup', logpath,

> + resources=[ResourceType.CACHE,

> + ResourceType.PROCESS],

> + exclusive_resources=[ResourceType.CACHE],

> + complete_cb=None)

> +

> + self.schedule_jobs([job])

> +

> + def _check_cache_size_real(self):

> + self.context.info("Checking cache size")

> +

> + logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')

> + job = CacheSizeJob(self, 'cache_size', logpath,

> + resources=[ResourceType.CACHE,

> + ResourceType.PROCESS],

> + exclusive_resources=[ResourceType.CACHE],

> + complete_cb=self._run_cleanup)

> +

> + self.schedule_jobs([job])

> +

> # _suspend_jobs()

> #

> # Suspend all ongoing jobs.

> @@ -397,6 +430,9 @@ class Scheduler():

> if not job.terminate_wait(timeout):

> job.kill()

>

> + # Clear out the waiting jobs

> + self.waiting_jobs = []

> +

> # Regular timeout for driving status in the UI

> def _tick(self):

> elapsed = self.elapsed_time()

>

>

>

> View it on GitLab:

> https://gitlab.com/BuildStream/buildstream/commit/a6a7a01f9c6e5ada863377554

> aa3b0bd7348079d

 

 

--

Agustín Benito Bethencourt

Principal Consultant

Codethink Ltd

We respect your privacy. See https://www.codethink.co.uk/privacy.html



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]