Library
Powerhose is organized into a simple hierarchy of classes and a few functions:
- get_cluster() – a function that create a whole cluster.
- Job – A class that holds a job to be performed.
- Broker – A class that pass the jobs it receives to workers.
- Worker – A class that connects to a broker and pass jobs it receives
to a Python callable.
- Heartbeat and Stethoscope – implements a heartbeat.
- Client – A class that connects to a broker and let you run jobs against
it.
get_cluster
The get_cluster() function creates a Broker and several
Worker instance. It can be run in the background for conveniency.
-
powerhose.get_cluster(target, numprocesses=5, frontend='ipc:///tmp/powerhose-front.ipc', backend='ipc:///tmp/powerhose-back.ipc', heartbeat='ipc:///tmp/powerhose-beat.ipc', register='ipc:///tmp/powerhose-reg.ipc', working_dir='.', logfile='stdout', debug=False, background=False, worker_params=None, timeout=7.5, max_age=-1, max_age_delta=0)
Runs a Powerhose cluster.
Options:
- callable: The Python callable that will be called when the broker
receive a job.
- numprocesses: The number of workers. Defaults to 5.
- frontend: the ZMQ socket to receive jobs.
- backend: the ZMQ socket to communicate with workers.
- register : the ZMQ socket to register workers
- heartbeat: the ZMQ socket to receive heartbeat requests
- working_dir: The working directory. Defaults to ”.”
- logfile: The file to log into. Defaults to stdout.
- debug: If True, the logs are at the DEBUG level. Defaults to False
- background: If True, the cluster is run in the background.
Defaults to False.
- worker_params: a dict of params to pass to the worker. Default is
None
- timeout the maximum time allowed before the thread stacks is dumped
and the job result not sent back.
- max_age: maximum age for a worker in seconds. After that delay,
the worker will simply quit. When set to -1, never quits.
Defaults to -1.
- max_age_delta: maximum value in seconds added to max age.
The Worker will quit after max_age + random(0, max_age_delta)
This is done to avoid having all workers quit at the same instant.
Example:
from powerhose import get_cluster
from powerhose.client import Client
cluster = get_cluster('example.echo', background=True)
cluster.start()
client = Client()
for i in range(10):
print client.execute(str(i))
cluster.stop()
Job
-
class powerhose.job.Job(data='', headers=None)
A Job is just a container that’s passed into the wire.
A job is composed of headers and raw data, and offers serialization.
Options:
- data: the raw string data (default: ‘’)
- headers: a mapping of headers (default: None)
Adds a header.
Options:
- name: header name
- value: value
Both values should be strings. If the header already exists
it’s overwritten.
-
serialize()
Serializes the job.
The output can be sent over a wire. A serialized job
can be read with a cursor with no specific preprocessing.
-
classmethod load_from_string(data)
Loads a job from a serialized string and return a Job instance.
Options:
- data : serialized string.
Example:
>>> from powerhose.job import Job
>>> job = Job('4*2')
>>> job.serialize()
'NONE:::4*2'
>>> Job.load_from_string('NONE:::4*2')
<powerhose.job.Job object at 0x107b78c50>
>>> Job.load_from_string('NONE:::4*2').data
'4*2'
Broker
Worker
-
class powerhose.worker.Worker(target, backend='ipc:///tmp/powerhose-back.ipc', heartbeat='ipc:///tmp/powerhose-beat.ipc', register='ipc:///tmp/powerhose-reg.ipc', ping_delay=10.0, ping_retries=3, params=None, timeout=7.5, max_age=-1, max_age_delta=0)
Class that links a callable to a broker.
Options:
- target: The Python callable that will be called when the broker
send a job.
- backend: The ZMQ socket to connect to the broker.
- heartbeat: The ZMQ socket to perform PINGs on the broker to make
sure it’s still alive.
- register : the ZMQ socket to register workers
- ping_delay: the delay in seconds betweem two pings.
- ping_retries: the number of attempts to ping the broker before
quitting.
- params a dict containing the params to set for this worker.
- timeout the maximum time allowed before the thread stacks is dump
and the job result not sent back.
- max_age: maximum age for a worker in seconds. After that delay,
the worker will simply quit. When set to -1, never quits.
Defaults to -1.
- max_age_delta: maximum value in seconds added to max age.
The Worker will quit after max_age + random(0, max_age_delta)
This is done to avoid having all workers quit at the same instant.
Defaults to 0. The value must be an integer.
-
start()
Starts the worker
-
stop()
Stops the worker.
Heartbeat
The Broker class runs a Heartbeat instance that regularly
sends a BEAT message on a PUB channel. Each worker has a Stethoscope
instance that subscribes to the channel, to check if the Broker is still
around.
-
class powerhose.heartbeat.Heartbeat(endpoint='ipc:///tmp/powerhose-beat.ipc', interval=10.0, io_loop=None, ctx=None)
Class that implements a ZMQ heartbeat server.
This class sends in a ZMQ socket regular beats.
Options:
- endpoint : The ZMQ socket to call.
- interval : Interval between two beat.
-
stop()
Stops the Pong service
-
start()
Starts the Pong service
-
class powerhose.heartbeat.Stethoscope(endpoint='ipc:///tmp/powerhose-beat.ipc', warmup_delay=0.5, delay=10.0, retries=3, onbeatlost=None, onbeat=None, io_loop=None, ctx=None)
Class that implements a ZMQ heartbeat client.
Options:
- endpoint : The ZMQ socket to call.
- warmup_delay : The delay before starting to Ping. Defaults to 5s.
- delay: The delay between two pings. Defaults to 3s.
- retries: The number of attempts to ping. Defaults to 3.
- onbeatlost: a callable that will be called when a ping failed.
If the callable returns True, the ping quits. Defaults to None.
- onbeat: a callable that will be called when a ping succeeds.
Defaults to None.
-
stop()
Stops the Pinger
Client
-
class powerhose.client.Client(frontend='ipc:///tmp/powerhose-front.ipc', timeout=5.0, timeout_max_overflow=7.5, timeout_overflows=1, debug=False, ctx=None)
Class to call a Powerhose cluster.
Options:
- frontend: ZMQ socket to call.
- timeout: maximum allowed time for a job to run.
Defaults to 1s.
- timeout_max_overflow: maximum timeout overflow allowed.
Defaults to 1.5s
- timeout_overflows: number of times in a row the timeout value
can be overflowed per worker. The client keeps a counter of
executions that were longer than the regular timeout but shorter
than timeout_max_overflow. When the number goes over
timeout_overflows, the usual TimeoutError is raised.
When a worker returns on time, the counter is reset.
-
execute(job, timeout=None)
Runs the job
Options:
- job: Job to be performed. Can be a Job
instance or a string. If it’s a string a Job instance
will be automatically created out of it.
- timeout: maximum allowed time for a job to run.
If not provided, uses the one defined in the constructor.
If the job fails after the timeout, raises a TimeoutError.
This method is thread-safe and uses a lock. If you need to execute a
lot of jobs simultaneously on a broker, use the Pool class.
-
ping(timeout=1.0)
Can be used to simply ping the broker to make sure
it’s responsive.
Returns the broker PID
Client Pool
-
class powerhose.client.Pool(size=10, frontend='ipc:///tmp/powerhose-front.ipc', timeout=5.0, timeout_max_overflow=7.5, timeout_overflows=1, debug=False, ctx=None)
The pool class manage several CLient instances
and publish the same interface,
Options:
- size: size of the pool. Defaults to 10.
- frontend: ZMQ socket to call.
- timeout: maximum allowed time for a job to run.
Defaults to 5s.
- timeout_max_overflow: maximum timeout overflow allowed
- timeout_overflows: number of times in a row the timeout value
can be overflowed per worker. The client keeps a counter of
executions that were longer than the regular timeout but shorter
than timeout_max_overflow. When the number goes over
timeout_overflows, the usual TimeoutError is raised.
When a worker returns on time, the counter is reset.