checker.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # Copyright 2019 Google
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from __future__ import division
  15. import math
  16. import multiprocessing.pool
  17. import sys
  18. import threading
  19. # Python 3 renamed Queue to queue
  20. try:
  21. import queue
  22. except ImportError:
  23. import Queue as queue
  24. _TASKS = multiprocessing.cpu_count()
  25. _output_lock = threading.Lock()
  26. def shard(items):
  27. """Breaks down the given items into roughly equal sized lists.
  28. The number of lists will be equal to the number of available processor cores.
  29. """
  30. if not items:
  31. return []
  32. n = int(math.ceil(len(items) / _TASKS))
  33. return _chunks(items, n)
  34. def _chunks(items, n):
  35. """Yield successive n-sized chunks from items."""
  36. for i in range(0, len(items), n):
  37. yield items[i:i + n]
  38. class Result(object):
  39. def __init__(self, num_errors, output):
  40. self.errors = num_errors
  41. self.output = output
  42. @staticmethod
  43. def from_list(errors):
  44. return Result(len(errors), '\n'.join(errors))
  45. class Pool(object):
  46. def __init__(self):
  47. # Checkers submit tasks to be run and these are dropped in the _pending
  48. # queue. Workers process that queue and results are put in the _results
  49. # queue. _results is drained by the thread that calls join().
  50. self._pending = queue.Queue()
  51. self._results = queue.Queue()
  52. def worker():
  53. while True:
  54. task, args = self._pending.get()
  55. result = task(*args)
  56. if result is not None:
  57. self._results.put(result)
  58. self._pending.task_done()
  59. for i in range(_TASKS):
  60. t = threading.Thread(target=worker)
  61. t.daemon = True
  62. t.start()
  63. def submit(self, task, *args):
  64. """Submits a task for execution by the pool.
  65. Args:
  66. task: A callable routine that will perform the work.
  67. *args: A list of arguments to pass that routine.
  68. """
  69. self._pending.put((task, args))
  70. def join(self):
  71. """Waits for the completion of all submitted tasks.
  72. Returns:
  73. The number of errors encountered.
  74. """
  75. self._pending.join()
  76. num_errors = 0
  77. while not self._results.empty():
  78. result = self._results.get()
  79. num_errors += result.errors
  80. sys.stdout.write(result.output)
  81. self._results.task_done()
  82. self._results.join()
  83. return num_errors
  84. def exit(self):
  85. """Waits for the completion of the submitted tasks and exits.
  86. This calls join() and then exits with a 0 status code if there were no
  87. errors, or 1 if there were.
  88. """
  89. errors = self.join()
  90. sys.exit(errors > 0)