checker.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # Copyright 2019 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import division
  15. import math
  16. import multiprocessing.pool
  17. import six
  18. import sys
  19. import threading
  20. # Python 3 renamed Queue to queue
  21. try:
  22. import queue
  23. except ImportError:
  24. import Queue as queue
  25. _TASKS = multiprocessing.cpu_count()
  26. _output_lock = threading.Lock()
  27. def shard(items):
  28. """Breaks down the given items into roughly equal sized lists.
  29. The number of lists will be equal to the number of available processor cores.
  30. """
  31. if not items:
  32. return []
  33. n = int(math.ceil(len(items) / _TASKS))
  34. return _chunks(items, n)
  35. def _chunks(items, n):
  36. """Yield successive n-sized chunks from items."""
  37. for i in range(0, len(items), n):
  38. yield items[i:i + n]
  39. class Result(object):
  40. def __init__(self, num_errors, output):
  41. self.errors = num_errors
  42. self.output = six.ensure_text(output)
  43. @staticmethod
  44. def from_list(errors):
  45. return Result(len(errors), '\n'.join(errors))
  46. class Pool(object):
  47. def __init__(self):
  48. # Checkers submit tasks to be run and these are dropped in the _pending
  49. # queue. Workers process that queue and results are put in the _results
  50. # queue. _results is drained by the thread that calls join().
  51. self._pending = queue.Queue()
  52. self._results = queue.Queue()
  53. def worker():
  54. while True:
  55. task, args = self._pending.get()
  56. result = task(*args)
  57. if result is not None:
  58. self._results.put(result)
  59. self._pending.task_done()
  60. for i in range(_TASKS):
  61. t = threading.Thread(target=worker)
  62. t.daemon = True
  63. t.start()
  64. def submit(self, task, *args):
  65. """Submits a task for execution by the pool.
  66. Args:
  67. task: A callable routine that will perform the work.
  68. *args: A list of arguments to pass that routine.
  69. """
  70. self._pending.put((task, args))
  71. def join(self):
  72. """Waits for the completion of all submitted tasks.
  73. Returns:
  74. The number of errors encountered.
  75. """
  76. self._pending.join()
  77. num_errors = 0
  78. while not self._results.empty():
  79. result = self._results.get()
  80. num_errors += result.errors
  81. sys.stdout.write(result.output)
  82. self._results.task_done()
  83. self._results.join()
  84. return num_errors
  85. def exit(self):
  86. """Waits for the completion of the submitted tasks and exits.
  87. This calls join() and then exits with a 0 status code if there were no
  88. errors, or 1 if there were.
  89. """
  90. errors = self.join()
  91. sys.exit(errors > 0)