wingfoil.wingfoil
Wingfoil is a blazingly fast, highly scalable stream processing framework designed for latency-critical use cases such as electronic trading and real-time AI systems
- class Node
Bases:
object- count()
Counts how many times upstream node has ticked.
- run(realtime, start=None, duration=None, cycles=None)
- class PrometheusExporter(addr)
Bases:
objectServes a Prometheus-compatible GET /metrics endpoint.
- Usage:
exporter = PrometheusExporter(“0.0.0.0:9091”) port = exporter.serve() node = exporter.register(“my_metric”, stream)
- register(name, stream)
Register a stream as a named gauge metric. Returns a Node.
- serve()
Start the HTTP server. Returns the port that was bound.
- class Stream(inner)
Bases:
object- average()
- buffer(capacity)
- collect()
- count()
- csv_write(path)
Write this stream of dicts to a CSV file.
Each dict becomes one CSV row. Headers are inferred from the first dict’s keys, and a time column is prepended with the graph time in nanoseconds.
- Parameters:
path – Output file path
- Returns:
A Node that drives the write operation.
- dataframe()
- delay(delay_secs)
Propagates its source delayed by specified duration (milliseconds)
- difference()
difference in its source from one cycle to the next (pass-through of PyElement)
- distinct()
only propagates its source if it changed (uses PartialEq on PyElement)
- etcd_pub(endpoint, lease_ttl=None, force=True)
Publish this stream of dicts to etcd via PUT.
Stream values must be dicts with “key” (str) and “value” (bytes), or lists of such dicts for multiple writes per tick.
- Parameters:
endpoint – etcd endpoint, e.g. “http://localhost:2379”
lease_ttl – optional lease TTL in seconds; keys expire after this duration and vanish immediately on clean shutdown. Pass None for persistent keys (default).
force – if True (default), silently overwrite existing keys. If False, fail if any key already exists.
- Returns:
A Node that drives the write operation.
- filter(keep_func)
drops source contingent on supplied predicate (Python callable)
- finally(func)
- for_each(func)
- inspect(func)
- kdb_write(host, port, table, columns)
Write this stream to a KDB+ table.
- Parameters:
host – KDB+ server hostname
port – KDB+ server port
table – Name of the target KDB+ table
columns – List of (name, type) tuples for non-time columns. Supported types: “symbol”, “float”, “long”, “int”, “bool”
- Returns:
A Node that drives the write operation.
- latency_report(stages, print_on_teardown=True)
Install a latency report sink. The stream must carry TracedBytes values. Per-stage delta statistics (count/min/mean/p50/p99/max) are printed on graph shutdown.
- Parameters:
stages – Stage names in order (same list used for Latency).
print_on_teardown – Whether to print the report on shutdown (default True).
- Returns:
A Node that drives the report sink.
- latency_report_if(stages, enabled, print_on_teardown=True)
Like latency_report, but only installs the sink when enabled is True. When False, returns the upstream as a Node (no report sink).
- limit(limit)
propagates source up to limit times
- logged(label)
logs source and propagates it. Default level INFO.
- map(func)
Map’s its source into a new Stream using the supplied Python callable.
- not()
- otlp_push(metric_name, endpoint, service_name)
Push this stream as an OTLP gauge metric.
- Parameters:
metric_name – Name of the metric to report
endpoint – OTLP HTTP endpoint, e.g. “http://localhost:4318”
service_name – Service name reported in OTLP resource attributes
- Returns:
A Node that drives the push operation.
- peek_value()
- run(realtime, start=None, duration=None, cycles=None)
- sample(trigger)
- stamp(stage)
Stamp a named latency stage on each tick using the cycle-start wall-clock time. The stream must carry TracedBytes values.
- Parameters:
stage – Stage name (must match one of the names in the Latency).
- Returns:
A new Stream with the stage stamped.
- stamp_if(stage, enabled)
Like stamp, but only inserts the stamp node when enabled is True. When False, returns the stream unchanged — zero runtime cost.
- stamp_precise(stage)
Stamp a named latency stage with a precise wall-clock read (~5-10ns TSC read per tick). Gives intra-cycle resolution.
- stamp_precise_if(stage, enabled)
Like stamp_precise, but only inserts the stamp node when enabled is True. When False, returns the stream unchanged.
- sum()
sum the stream (extracts f64 values before summing)
- web_pub(server, topic)
Publish this stream to a [WebServer] topic over WebSocket.
Values must be JSON-compatible Python objects (dict / list / str / int / float / bool / bytes / None). Connected browser clients that subscribed to topic receive the frames.
- Parameters:
server – A WebServer instance.
topic – The topic name to publish on.
- Returns:
A Node that drives the publish operation.
- with_time()
Pairs each value with the graph time as a (float, value) tuple, where the float is seconds since Unix epoch.
- zmq_pub(port)
Publish this stream of bytes to a ZMQ PUB socket bound on the given port.
The stream values must be bytes objects. Only supported in real-time mode.
- Parameters:
port – TCP port to bind the PUB socket on
- Returns:
A Node that drives the publish operation.
- zmq_pub_etcd(name, port, endpoint)
Publish this stream of bytes and register as name in etcd.
Binds on 127.0.0.1; use zmq_pub_etcd_on for multi-host deployments where 127.0.0.1 is not routable by subscribers on other hosts.
- Parameters:
name – Name / etcd key to register under (e.g. “quotes”)
port – TCP port to bind the PUB socket on
endpoint – etcd endpoint (e.g. “http://localhost:2379”)
- Returns:
A Node that drives the publish operation.
- zmq_pub_etcd_on(name, address, port, endpoint)
Like zmq_pub_etcd but binds on address instead of 127.0.0.1.
- Parameters:
name – Name / etcd key to register under
address – Routable bind address (e.g. “192.168.1.10”)
port – TCP port to bind the PUB socket on
endpoint – etcd endpoint
- Returns:
A Node that drives the publish operation.
- class WebServer(addr, codec=None, static_dir=None, historical=False)
Bases:
objectPython-facing handle to a running [WebServer].
Mirrors the builder on the Rust side: construct with a bind address, optionally set the codec and static-file directory, then register publish / subscribe streams.
- codec_name()
Return the codec currently used on the wire (“bincode” or “json”).
- port()
The bound port (0 when historical=True).
- sub(topic)
Subscribe to topic. Returns a source Stream of dicts / lists / primitives decoded from the clients’ frames.
To publish a stream, use the fluent stream.web_pub(server, topic) method on the stream itself.
- bimap(a, b, func)
maps steams a amd b into a new stream using func (e.g lambda a, b: a + b)
- build_dataframe(stream_dict: Dict[str, Any]) DataFrame
Builds a combined DataFrame from a dict of streams.
- constant(val)
A stream that ticks once, on first engine cycle
- py_csv_read(path, time_column)
Read a CSV file into a stream of dicts.
Each tick yields a dict where keys are column headers and values are strings. Rows sharing the same timestamp are collapsed to one-per-tick.
- Parameters:
path – Path to the CSV file (headers required)
time_column – Name of the column containing the timestamp (parsed as integer nanoseconds since epoch)
- py_etcd_sub(endpoint, prefix)
Subscribe to etcd keys matching a prefix.
Emits a snapshot of all existing keys under the prefix, then delivers live watch events as they arrive. Each tick yields a list of event dicts:
`python {"kind": "put" | "delete", "key": str, "value": bytes, "revision": int} `- Parameters:
endpoint – etcd endpoint, e.g. “http://localhost:2379”
prefix – key prefix to watch, e.g. “/myapp/”
- py_fix_accept(port, sender_comp_id, target_comp_id)
Bind a FIX acceptor and return (data_stream, status_stream).
- Parameters:
port – Port to listen on
sender_comp_id – SenderCompID for this session
target_comp_id – Expected TargetCompID of the connecting initiator
- py_fix_connect(host, port, sender_comp_id, target_comp_id)
Connect to a FIX acceptor and return (data_stream, status_stream).
- Parameters:
host – FIX acceptor hostname
port – FIX acceptor port
sender_comp_id – SenderCompID for this session
target_comp_id – TargetCompID of the counterparty
- Returns:
A tuple of (data_stream, status_stream) where data_stream yields lists of message dicts and status_stream yields session status strings.
- py_fix_connect_tls(host, port, sender_comp_id, target_comp_id, password=None)
Connect to a TLS-secured FIX acceptor (e.g. LMAX) and return (data_stream, status_stream, injector_func).
The injector is a callable that sends a FIX message dict on the session:
`python injector({"msg_type": "V", "fields": [(262, "req1"), (263, "1")]}) `- Parameters:
host – FIX acceptor hostname
port – FIX acceptor port (typically 443 for TLS)
sender_comp_id – SenderCompID (usually your username)
target_comp_id – TargetCompID of the counterparty
password – Optional password (tag 554 in Logon)
- py_kdb_read(host, port, query, time_col, chunk_size=3600)
Read data from a KDB+ database.
Returns a PyStream where each tick is a dict with column names as keys.
- Parameters:
host – KDB+ server hostname
port – KDB+ server port
query – q query to execute (e.g. “select from trade”)
time_col – Name of the time column for time-slice filtering
chunk_size – Duration of each time slice in seconds (default: 3600)
- Returns:
value}
- Return type:
Stream where each tick yields a dict of {column_name
Requires RunMode::HistoricalFrom with a non-zero start time and RunFor::Duration.
- py_kdb_write(host, port, table, columns, upstream)
Write stream data to a KDB+ table.
- Parameters:
host – KDB+ server hostname
port – KDB+ server port
table – Name of the target KDB+ table
columns – List of (name, type) tuples for non-time columns. Supported types: “symbol”, “float”, “long”, “int”, “bool”
upstream – The PyStream to consume
- Returns:
A Node that drives the write operation.
- py_zmq_sub(address)
Subscribe to a ZMQ PUB socket.
Returns a (data_stream, status_stream) tuple.
Each tick of data_stream yields a list[bytes] containing all messages received in that cycle. Each tick of status_stream yields the string “connected” or “disconnected”.
- Parameters:
address – ZMQ endpoint to connect to (e.g. “tcp://localhost:5556”)
- Returns:
Tuple of (data_stream, status_stream)
- py_zmq_sub_etcd(name, endpoint)
Subscribe to a named ZMQ publisher via etcd service discovery.
Looks up name in etcd and connects to the publisher at the stored address. The lookup happens at call time — ensure the publisher is registered before calling this function.
- Parameters:
name – Publisher name / etcd key (e.g. “quotes”)
endpoint – etcd endpoint (e.g. “http://localhost:2379”)
- Returns:
Tuple of (data_stream, status_stream)
- Raises:
RuntimeError – if the key is absent or etcd is unreachable
- ticker(seconds)
A node that ticks at the specified period
- to_dataframe(data: List[Any] | Dict[str, List[Any]]) DataFrame
Converts a list of data into a DataFrame.