At RocketFin, our team of software and data engineers is continually exploring new tools and technologies to enhance the value we deliver to our clients in the financial industry. 3forge is a technology company that provides real-time data visualization and analytics solutions for financial institutions. Their flagship platform, AMI, integrates vast amounts of data from multiple sources, allowing users to analyze, visualize, and interact with complex datasets quickly. 3forge's tools are designed to improve decision-making and operational efficiency, particularly in trading, risk management, and compliance within the financial industry.
In this three-part blog series, we will (a) create a backend with which to retrieve data and push it to AMI, (b) create a tabular visualization of our data within AMI with streaming price data, and finally (c) create a candlestick chart using AMI’s charting capabilities using the ticker data we have processed.
Real-Time Data Integration: AMI can connect to and consolidate data from multiple sources, including databases, APIs, and streaming data, in real-time. It can process hundreds of thousands of messages per second, which is vital when dealing with financial information.
Advanced Data Visualization: The platform allows users to create highly customizable dashboards and visualizations, enabling them to monitor and analyze complex datasets effectively.
Complex Event Processing: AMI can handle high-frequency data and perform complex event processing, making it ideal for trading, risk management, and other time-sensitive operations.
Scalability: The platform is designed to handle large volumes of data, and its various constituent parts can scale according to the needs of the organization both vertically and horizontally.
User-Friendly Interface: AMI provides an intuitive interface for both technical and non-technical users, allowing them to interact with data and build custom applications without extensive coding. This is not limited to navigating existing dashboards but to creating new ones as needed.
Security and Compliance: AMI supports stringent security protocols and compliance measures, ensuring that sensitive financial data is protected and meets regulatory requirements.
Custom Application Development: The platform allows for the development of bespoke applications that can be tailored to specific business needs, leveraging the underlying data and processing capabilities of AMI. To this end, they also offer several libraries (Java, Python, .NET) that make integrating with AMI very straightforward. Otherwise, developers can also choose to open a raw Telnet connection to the AMI relay. In this blog, we will be making use of the Python library.
In this demo application, we will be opening a web socket to Yahoo’s finance API, subscribing to several symbols, decoding the protobuf encoded messages as we receive them, and subsequently sending them to AMI relay for further processing.
As previously mentioned, our backend service will be written in Python, and the most recent version of the AMI Client library is available on request from 3Forge by sending an email to support@3forge.com.
We will also be using the following Yahoo Finance ticker data protobuf definition available on our GitLab snippet instance here, entitled `ticker.proto`. This can be compiled using protobuf using the following command:
protoc.exe --python_out=. ticker.proto
This generated output file should be similar to the `ticker_pb2.py` file included in the GitLab snippet.
We will also be using the protobuf3-to-dict and websockets python packages, and venv to better manage our Python environment, however setting this up is outside the scope of this post.
The websocket connection we establish needs to be able to do 2 things:
Let’s first define the skeleton code that our websocket will use:
import websockets
async def websocket_client(uri: str) -> None:
async with websockets.connect(uri) as websocket:
await on_open(websocket)
await message_handler(websocket)
Here, we have a function called websocket_client, that takes a single parameter of type string which we call uri (this will be the uri to the yahoo finance websocket). We then create a new websocket and connect to the provided uri, which is wrapped in a with statement such that the context is bound to a limited scope, and it can later be disposed of automatically.
Next, we wait for the asynchronous function on_open to complete, and subsequently process any messages that we’re sent over the socket using the message_handler function.
import json
async def on_open(ws: websockets.WebSocketClientProtocol):
msg = json.dumps({"subscribe": ['NVDA', 'SHEL.L']})
await ws.send(msg)
The on_open function is straightforward. We compose a json string using a dictionary that defines the symbols we want to subscribe to and send that over the websocket. In this example, I am subscribing to NVDA (Nvidia, which is traded on NASDAQ) and SHEL.L (Shell, which is traded on LSE).
import base64
from protobuf_to_dict import protobuf_to_dict
from ticker_pb2 import Ticker
async def message_handler(websocket: websockets.WebSocketClientProtocol):
async for message in websocket:
try:
ticker = Ticker()
message_bytes = base64.b64decode(message)
ticker.ParseFromString(message_bytes)
messageDict = protobuf_to_dict(ticker, use_enum_labels=True)
print(messageDict)
except Exception as e:
print(e)
The message_handler function operates on the basis of an infinite loop for each message that exists in the socket. First, we create an instance of the generated protobuf dto, we decode the received message using base64, and populate the data using protobuf’s ParseFromString function. The populated object is converted to a dictionary and printed out.
The last step is to handle the application startup, and gracefully terminate when we send a sigint (ctrl + C). This can be done using various mechanisms, but we chose to do so using asyncio.
if __name__ == "__main__":
try:
# URI of the websocket server
uri = "wss://streamer.finance.yahoo.com/"
asyncio.run(websocket_client(uri))
except KeyboardInterrupt:
print("Keyboard interrupt | Received exit signal, shutting down...")
sys.exit(0)
When we start our service, we can see the various executions streaming in. Keep in mind that if you’re doing this out of market hours, there will be significantly less activity, especially on the weekends. This is a trap that we fell into a couple of times that really had us scratching our heads!
Let’s now create the class that will manage our connection with AMI. You can find the complete source for this in the same GitLab snippet with the title forgeClient.py.
def __init__(self) -> None:
super().__init__()
self.client = AmiClient()
options = Options.ENABLE_SEND_SEQNUM + Options.ENABLE_SEND_TIMESTAMPS
self.client.addListener(self)
self.client.start(os.environ['AMI_ADDRESS'], 3289, "demo", options)
The class initializer creates an instance of the AmiClient provided by the library, configures it to send sequence numbers and timestamps, binds the appropriate event listeners and connects to AMI itself . Since we chose to dockerise our instance of AMI, we are retrieving the hostname using an environment variable, however if you’re running AMI One locally, that will default to “127.0.0.1”. Port 3289 and the use “demo” are both defaults used by AMI Relay. These can be changed in AMI’s properties.
def sendRawDict(self, input_dict: Dict[str, Union[str, int]]) -> None:
self.client.startObjectMessage("YA_Trade_Raw")
for key in input_dict:
self.add_param(key, input_dict[key])
self.client.sendMessageAndFlush()
def add_param(self, key: str, value: Union[str, int, float]):
variable_type = type(value).__name__
switch = {
'int': lambda: self.client.addMessageParamLong(key, int(value)),
'str': lambda: self.client.addMessageParamString(key, str(value)),
'float': lambda: self.client.addMessageParamFloat(key, float(value)),
}
switch.get(variable_type, lambda: self.client.addMessageParamString(
key, str(value)))()
The next two functions are what we will use to forward the messages we received over the websocket connection with Yahoo Finance. The sendRawDict function takes as input any arbitrary dictionary. We start a new object message of type YA_Trade_Raw - raw because we’ll perform some additional processing on the records once they’re loaded into AMI. One dictionary property at a time, we proceed to dynamically construct our message by adding properties depending on what each of their value types are. Finally, we send the message and block until it is fully read by AMI.
In the interest of time, we won’t go into the individual lifecycle methods, but these are all self-explanatory and included in the snippet available on GitLab.
The last thing we need to do is create an instance of forgeClient in our main service code and invoke the sendRawDict function instead of printing out the dictionary which we have received over the socket. Similarly, this code is available in the GitLab snippet as service.py.
Running the service now, we can see that a connection is established with the AMI relay, and messages analogous to the below streaming across:
O#27@1724138679280|T="YA_Trade_Raw"|id='SHEL.L'|price=2782.5F|time=1724137777000L|exchange='LSE'|quoteType='EQUITY'|marketHours='REGULAR_MARKET'|changePercent=-1.0842516422271729F|dayVolume=159064L|change=-30.5F|lastSize=27L|priceHint=2L
O = message type is an object
27 = sequence id, which we enabled using the options flags
@1724138679280 = the timestamp, which we enabled using the options flags
T = the object type, which we specified when starting the object
Id, price, time = the message parameters which we constructed dynamically using the dictionary
In our next blog post, we will focus on AMI itself, where we transform the messages that we have received to build a screen showing us ticking prices. In the meantime, if you have any questions, comments, or concerns about this post, you can reach me at mark.buhagiar@rocketfin.co. I will be happy to discuss this topic further, as well as how RocketFin can help your organisation leverage 3Forge and the AMI platform to achieve your objectives more effectively and efficiently.