Hi,
please check below what can be sent to the mailing list by default by Gitlab. The interesting options available are:
* To include or not the diffs * Include more than one recipient. In this case we are talking about the mailing list. * More information about the feature: https://docs.gitlab.com/ee/user/project/integrations/emails_on_push.html
As you can see, these mails can be easily filtered since the Subject start by
[Git][Group/project]
Please let me know if this is what you want to activate it.
Further functionality requires the usage of Webhooks. I would need the help of an engineer for doing more complex/customized stuff.
######
> Tristan Maat pushed to branch 135-expire-artifacts-in-local-cache at > BuildStream / buildstream > > > Commits: > a6a7a01f by Tristan Maat at 2018-07-10T14:25:44Z > _context.py: Add simpler message handlers > > - - - - - > > > 13 changed files: > > - buildstream/_artifactcache/artifactcache.py > - buildstream/_context.py > - buildstream/_scheduler/jobs/cachesizejob.py > - buildstream/_scheduler/jobs/cleanupjob.py > - buildstream/_scheduler/jobs/job.py > - buildstream/_scheduler/queues/buildqueue.py > - buildstream/_scheduler/queues/fetchqueue.py > - buildstream/_scheduler/queues/pullqueue.py > - buildstream/_scheduler/queues/pushqueue.py > - buildstream/_scheduler/queues/queue.py > - buildstream/_scheduler/queues/trackqueue.py > - buildstream/_scheduler/resources.py > - buildstream/_scheduler/scheduler.py > > > Changes: > > ===================================== > buildstream/_artifactcache/artifactcache.py > ===================================== > @@ -217,8 +217,10 @@ class ArtifactCache(): > default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'], > 'buildstream.conf') > detail = ("There is not enough space to build the given > element.\n" - "Please increase the cache-quota in > {}." - .format(self.context.config_origin or > default_conf)) + "Please increase the cache-quota > in {}.\n" + "Space usage: {:,} B/{:,} B" > + .format(self.context.config_origin or > default_conf, + > self.calculate_cache_size(), self.context.cache_quota)) > > if self.calculate_cache_size() > self.context.cache_quota: > raise ArtifactError("Cache too full. Aborting.", > > > ===================================== > buildstream/_context.py > ===================================== > @@ -393,6 +393,30 @@ class Context(): > self._message_handler(message, context=self) > return > > + def msg(self, text, *, plugin=None, msg_type=None, **kwargs): > + self.message(Message(plugin, msg_type, str(text), **kwargs)) > + > + def debug(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.DEBUG, **kwargs) > + > + def status(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.STATUS, > **kwargs) + > + def info(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.INFO, **kwargs) > + > + def warn(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.WARN, **kwargs) > + > + def error(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.ERROR, **kwargs) > + > + def bug(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.BUG, **kwargs) > + > + def log(self, text, *, plugin=None, **kwargs): > + self.msg(text, plugin=plugin, msg_type=MessageType.LOG, **kwargs) > + > # silence() > # > # A context manager to silence messages, this behaves in > > > ===================================== > buildstream/_scheduler/jobs/cachesizejob.py > ===================================== > @@ -35,7 +35,8 @@ class CacheSizeJob(Job): > > def _parent_complete(self, success, result): > self._cache._set_cache_size(result) > - self._complete_cb(result) > + if self._complete_cb: > + self._complete_cb(result) > > @contextmanager > def _child_logging_enabled(self, logfile): > > > ===================================== > buildstream/_scheduler/jobs/cleanupjob.py > ===================================== > @@ -35,7 +35,8 @@ class CleanupJob(Job): > > def _parent_complete(self, success, result): > self._cache._set_cache_size(result) > - self._complete_cb() > + if self._complete_cb: > + self._complete_cb() > > @contextmanager > def _child_logging_enabled(self, logfile): > > > ===================================== > buildstream/_scheduler/jobs/job.py > ===================================== > @@ -77,17 +77,16 @@ class Job(): > resources=None, exclusive_resources=None, max_retries=0): > > if resources is None: > - resources = [] > + resources = set() > else: > resources = set(resources) > if exclusive_resources is None: > - exclusive_resources = [] > + exclusive_resources = set() > else: > - resources = set(resources) > + exclusive_resources = set(resources) > > # Ensure nobody tries not use an exclusive resource. > - assert(exclusive_resources <= resources, > - "All exclusive resources must also be resources!") > + assert exclusive_resources <= resources, "All exclusive resources > must also be resources!" > > # > # Public members > > > ===================================== > buildstream/_scheduler/queues/buildqueue.py > ===================================== > @@ -18,10 +18,8 @@ > # Tristan Van Berkom <tristan vanberkom codethink co uk> > # Jürg Billeter <juerg billeter codethink co uk> > > -import os > from . import Queue, QueueStatus > -from .. import ResourceType > -from ..jobs import CacheSizeJob, CleanupJob > +from ..resources import ResourceType > > > # A queue which assembles elements > @@ -30,7 +28,7 @@ class BuildQueue(Queue): > > action_name = "Build" > complete_name = "Built" > - queue_type = ResourceType.PROCESS > + resources = [ResourceType.PROCESS] > > def process(self, element): > element._assemble() > @@ -53,29 +51,6 @@ class BuildQueue(Queue): > > return QueueStatus.READY > > - def _run_cleanup_if_required(self, cache_size): > - if cache_size < self._scheduler.context.cache_quota: > - return > - > - logpath = os.path.join(self._scheduler.context.logdir, > 'cleanup.{pid}.log') - cleanup_job = CleanupJob(self._scheduler, > - 'clean', logpath, > - resources=[ResourceType.CACHE, > - ResourceType.PROCESS], > - exclusive_resources=[ResourceType.CACHE], > - complete_cb=lambda: 1) > - self._scheduler.schedule_jobs([cleanup_job]) > - > - def _check_real_cache_size(self): > - logpath = os.path.join(self._scheduler.context.logdir, > 'cache_size.{pid}.log') - cache_size_job = > CacheSizeJob(self._scheduler, > - 'cache_size', logpath, > - resources=[ResourceType.CACHE, > - ResourceType.PROCESS], > - > exclusive_resources=[ResourceType.CACHE], - > complete_cb=self._run_cleanup_if_required) - > self._scheduler.schedule_jobs([cache_size_job]) > - > def _check_cache_size(self, job, element): > if not job.child_data: > return > @@ -87,7 +62,7 @@ class BuildQueue(Queue): > cache._add_artifact_size(artifact_size) > > if cache.get_approximate_cache_size() > > self._scheduler.context.cache_quota: - > self._check_real_cache_size() > + self._scheduler._check_cache_size_real() > > def done(self, job, element, result, success): > > > > ===================================== > buildstream/_scheduler/queues/fetchqueue.py > ===================================== > @@ -23,7 +23,7 @@ from ... import Consistency > > # Local imports > from . import Queue, QueueStatus > -from .. import ResourceType > +from ..resources import ResourceType > > > # A queue which fetches element sources > @@ -32,7 +32,7 @@ class FetchQueue(Queue): > > action_name = "Fetch" > complete_name = "Fetched" > - queue_type = ResourceType.DOWNLOAD > + resources = [ResourceType.DOWNLOAD] > > def __init__(self, scheduler, skip_cached=False): > super().__init__(scheduler) > > > ===================================== > buildstream/_scheduler/queues/pullqueue.py > ===================================== > @@ -20,7 +20,7 @@ > > # Local imports > from . import Queue, QueueStatus > -from .. import ResourceType > +from ..resources import ResourceType > > > # A queue which pulls element artifacts > @@ -29,7 +29,7 @@ class PullQueue(Queue): > > action_name = "Pull" > complete_name = "Pulled" > - queue_type = ResourceType.UPLOAD > + resources = [ResourceType.UPLOAD] > > def process(self, element): > # returns whether an artifact was downloaded or not > > > ===================================== > buildstream/_scheduler/queues/pushqueue.py > ===================================== > @@ -29,7 +29,7 @@ class PushQueue(Queue): > > action_name = "Push" > complete_name = "Pushed" > - queue_type = ResourceType.UPLOAD > + resources = [ResourceType.UPLOAD] > > def process(self, element): > # returns whether an artifact was uploaded or not > > > ===================================== > buildstream/_scheduler/queues/queue.py > ===================================== > @@ -75,13 +75,13 @@ class Queue(): > self._wait_queue = deque() > self._done_queue = deque() > self._max_retries = 0 > - if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD > in self.resources: - self._max_retries = > scheduler.context.sched_network_retries > > # Assert the subclass has setup class data > assert self.action_name is not None > assert self.complete_name is not None > - assert self.resources is not [] > + > + if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD > in self.resources: + self._max_retries = > scheduler.context.sched_network_retries > > ##################################################### > # Abstract Methods for Queue implementations # > @@ -248,14 +248,11 @@ class Queue(): > return ready > > def peek_ready_jobs(self): > - scheduler = self._scheduler > - > def ready(job): > return self.status(job.element) == QueueStatus.READY > > yield from (job for job in self._wait_queue if ready(job)) > > - > ##################################################### > # Private Methods # > ##################################################### > > > ===================================== > buildstream/_scheduler/queues/trackqueue.py > ===================================== > @@ -33,7 +33,7 @@ class TrackQueue(Queue): > > action_name = "Track" > complete_name = "Tracked" > - queue_type = ResourceType.DOWNLOAD > + resources = [ResourceType.DOWNLOAD] > > def process(self, element): > return element._track() > > > ===================================== > buildstream/_scheduler/resources.py > ===================================== > @@ -34,7 +34,16 @@ class Resources(): > ResourceType.UPLOAD: set() > } > > - def reserve_job_resources(self, job): > + def clear_job_resources(self, job): > + for resource in job.exclusive_resources: > + self._exclusive_resources[resource].remove(hash(job)) > + > + for resource in job.resources: > + self._used_resources[resource] -= 1 > + > + def reserve_exclusive_resources(self, job): > + exclusive = job.exclusive_resources > + > # The very first thing we do is to register any exclusive > # resources this job may want. Even if the job is not yet > # allowed to run (because another job is holding the resource > @@ -43,9 +52,10 @@ class Resources(): > # and no new jobs wanting these can be launched (except other > # exclusive-access jobs). > # > - for resource in job.exclusive_resources: > + for resource in exclusive: > self._exclusive_resources[resource].add(hash(job)) > > + def reserve_job_resources(self, job): > # First, we check if the job wants to access a resource that > # another job wants exclusive access to. If so, it cannot be > # scheduled. > @@ -81,8 +91,10 @@ class Resources(): > # Finally, we check if we have enough of each resource > # available. If we don't have enough, the job cannot be > # scheduled. > - if self._max_resources[resource] >= self._used_resources[resource]: > - return False > + for resource in job.resources: > + if (self._max_resources[resource] > 0 and > + self._used_resources[resource] >= > self._max_resources[resource]): + return False > > # Now we register the fact that our job is using the resources > # it asked for, and tell the scheduler that it is allowed to > > > ===================================== > buildstream/_scheduler/scheduler.py > ===================================== > @@ -27,7 +27,8 @@ import datetime > from contextlib import contextmanager > > # Local imports > -from .resources import Resources > +from .resources import Resources, ResourceType > +from .jobs import CacheSizeJob, CleanupJob > > > # A decent return code for Scheduler.run() > @@ -231,7 +232,9 @@ class Scheduler(): > # success (bool): Whether the Job completed with a success status > # > def job_completed(self, job, success): > + self._resources.clear_job_resources(job) > self.active_jobs.remove(job) > + self._job_complete_callback(job, success) > self._schedule_queue_jobs() > self._sched() > > @@ -245,6 +248,9 @@ class Scheduler(): > # automatically when Scheduler.run() is called initially, > # > def _sched(self): > + for job in self.waiting_jobs: > + self._resources.reserve_exclusive_resources(job) > + > for job in self.waiting_jobs: > if not self._resources.reserve_job_resources(job): > continue > @@ -257,7 +263,7 @@ class Scheduler(): > self._job_start_callback(job) > > # If nothings ticking, time to bail out > - if not self.active_jobs: > + if not self.active_jobs and not self.waiting_jobs: > self.loop.stop() > > # _schedule_queue_jobs() > @@ -306,6 +312,33 @@ class Scheduler(): > self.schedule_jobs(ready) > self._sched() > > + def _run_cleanup(self, cache_size): > + self.context.info(cache_size) > + > + if cache_size and cache_size < self.context.cache_quota: > + return > + > + logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log') > + job = CleanupJob(self, 'cleanup', logpath, > + resources=[ResourceType.CACHE, > + ResourceType.PROCESS], > + exclusive_resources=[ResourceType.CACHE], > + complete_cb=None) > + > + self.schedule_jobs([job]) > + > + def _check_cache_size_real(self): > + self.context.info("Checking cache size") > + > + logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log') > + job = CacheSizeJob(self, 'cache_size', logpath, > + resources=[ResourceType.CACHE, > + ResourceType.PROCESS], > + exclusive_resources=[ResourceType.CACHE], > + complete_cb=self._run_cleanup) > + > + self.schedule_jobs([job]) > + > # _suspend_jobs() > # > # Suspend all ongoing jobs. > @@ -397,6 +430,9 @@ class Scheduler(): > if not job.terminate_wait(timeout): > job.kill() > > + # Clear out the waiting jobs > + self.waiting_jobs = [] > + > # Regular timeout for driving status in the UI > def _tick(self): > elapsed = self.elapsed_time() > > > > View it on GitLab: > https://gitlab.com/BuildStream/buildstream/commit/a6a7a01f9c6e5ada863377554 > aa3b0bd7348079d
-- Agustín Benito Bethencourt Principal Consultant Codethink Ltd We respect your privacy. See https://www.codethink.co.uk/privacy.html |