In this hands-on tutorial you will create an IBM Event Streams Service on the IBM Cloud and run the sample application to produce events on the topic "kafka-python-console-sample-topic". Using IBM functions you create a trigger that invokes the write function every time an event arrives in the topic "kafka-python-console-sample-topic".
- Sign up for an IBM Cloud account.
- Fill in the required information and press the „Create Account“ button.
- After you submit your registration, you will receive an e-mail from the IBM Cloud team with details about your account. In this e-mail, you will need to click the link provided to confirm your registration.
- Now you should be able to login to your new IBM Cloud account ;-)
Go back to your IBM Cloud Dashboard by clicking the IBM Cloud Logo on the upper left. Under services you will find the Event Streams Service in the corresponding region. Access your service by clicking on its name.
The next step is to create a topic where your sample application can write to. On the manage page of your Event Streams service click on "Topics", then click on "Create topic +"
Type "kafka-python-console-sample-topic" as this is the topic used by the sample event streams application and click on "Next".
On the next pages leave the values to its default and click on "Next" and finally on "Create topic". As a result your newly created topic will be displayed.
An application can only access the service via its service credentials. Therefore we need to add service credentials as follows:
- Click on "Service credentials" on the left side on the Event Streams page.
- Then click on "New credential +" on the upper right side.
- Leave the service name to its default value and the role to "Manager" and click on "Add"
- Click **View credentials** to see the `api_key` and `kafka_brokers_sasl` values.
IBM Event Streams Service is a high-throughput message bus built with Apache Kafka. To get started with Event Streams and start sending and receiving messages, you can use the IBM Event Streams sample application.
Install git if you don't already have it.
Install Python 3.6 or later
- Open Keychain Access, export all certificates in System Roots to a single .pem file on your local machine
The sample application is stored in Github. Clone the event-streams-samples
repository by running the clone command from the command line.
git clone https://github.com/ibm-messaging/event-streams-samples.git
When the repository is cloned, from the command line change into the
kafka-python-console-sample
directory.
cd event-streams-samples/kafka-python-console-sample/
pip install -r requirements.txt
To run the producer sample, execute the following command:
python3 app.py <kafka_brokers_sasl> <kafka_admin_url> <api_key> <ca_location> -producer
To find the values for <kafka_brokers_sasl>
, <kafka_admin_url>
and <api_key>
, access your Event Streams instance in IBM Cloud®, go to the Service Credentials
tab and select the Credentials
you want to use.
<ca_location>
is the path where the trusted SSL certificates are stored on your machine and is therefore system dependent.
For example:
- Ubuntu: /etc/ssl/certs
- RedHat: /etc/pki/tls/cert.pem
- macOS: The .pem file you created in the prerequisite section
Note: <kafka_brokers_sasl>
must be a single string enclosed in quotes. For example: "host1:port1,host2:port2"
. We recommend using all the Kafka hosts listed in the Credentials
you selected.
The sample will run indefinitely until interrupted. To stop the process, use Ctrl+C
.
Procuder app example console output on macOS:
Note: The service credentials have been removed after this tutorial. Therefore the api_key in the example above is not anymore valid.
To run the consumer sample open a second command line window and execute the following command:
python3 app.py <kafka_brokers_sasl> <kafka_admin_url> <api_key> <ca_location> -consumer
The sample will run indefinitely until interrupted. To stop the process, use Ctrl+C
.
Consumer app example console output on macOS:
Note: The service credentials have been removed after this tutorial. Therefore the api_key in the example above is not anymore valid.
Note: The current namespace which is displayed after "Current namespace:"
Note: You can create a separate namespace for this tutorial by creating a new cloud foundry org and space. Ensure that you create your separate namespace in the same region as you have created the Event Streams Service in task 1) above.
Note: Ensure that you target the same resource group as you have created the Event Stream Service in task 1) above. If you have left the value to its default then you would enter:
ibmcloud target -g default
Test it
ibmcloud fn list
IBM Cloud™ Functions offers a catalog of templates to help you get started on your next project. Templates are a combination of actions, triggers, sequences. Some templates also incorporate other services from IBM Cloud. By using these templates, you can understand how IBM Cloud™ Functions entities work together and even use these entities as a basis for your own project.
More information about the quickstart templates
Clone the template repo.
git clone https://github.com/ibm-functions/template-messagehub-trigger.git
git clone https://github.com/ibm-functions/template-messagehub-trigger.git
Navigate to the directory for the action runtime that you want to use. For example, python
.
cd template-messagehub-trigger/runtimes/python
cd template-messagehub-trigger/runtimes/python
Deploy the template by using the following environment variables.
KAFKA_BROKERS=<hosts> KAFKA_TOPIC=<topic> KAFKA_ADMIN_URL=<admin_url> \
MESSAGEHUB_USER=<username> MESSAGEHUB_PASS=<password> \
PACKAGE_NAME=<name> RULE_NAME=<name> TRIGGER_NAME=<name> \
ibmcloud fn deploy -m manifest.yaml
Understanding the environment variables | |
---|---|
KAFKA_BROKERS | Your Event Streams REST endpoints. See Service Credentials in task 4) |
KAFKA_TOPIC | The topic to subscribe to. In our tutorial "kafka-python-console-sample-topic" |
KAFKA_ADMIN_URL | The Event Streams admin URL. See Service Credentials in task 4) |
MESSAGEHUB_USER | Your Event Streams username. See Service Credentials in task 4). In our case "token" |
MESSAGEHUB_PASS | Your Event Streams password. See Service Credentials in task 4). |
PACKAGE_NAME | A custom name for the package. In our tutorial "kafka-template-package-01" |
RULE_NAME | A custom name for the rule. In our tutorial "tutorial-write-rule-01" |
TRIGGER_NAME | A custom name for the trigger. In our tutorial "tutorial-write-trigger-01" |
List the entities you have deployed in your namespace
ibmcloud fn list
List the details of the trigger you deployed
ibmcloud fn trigger get tutorial-write-trigger-01
List the details of the rule you deployed
ibmcloud fn rule get tutorial-write-rule-01
Open an separate command line window and run the standalone sample application to produce messages as outlined in task 8)
Switch back to the command line window you used in task 12) where you deployed the Event Streams template
Poll the activation logs to see whether the incoming message trigger the process-message function
ibmcloud fn activation poll
We can see that the prossess-message function is activated each time a message arrives in the topic.
We see that the process-message function produces an error as it is expecting another message format than the one that was produced.
Change to the directory where you have downloaded the template-messagehub-trigger in task 12)
cd template-messagehub-trigger/runtimes/python
Open the file actions/process-message.py file with an editor
vi actions/process-message.py
Delete the existing content and replace it by the following one:
def main(dict):
messages = dict.get('messages')
if messages is None or messages[0] is None:
return { 'error': "Invalid arguments. Must include 'messages' JSON array with 'value' field" }
try:
val = messages[0]['value']
except KeyError:
return { 'error': "Invalid arguments. Must include 'messages' JSON array with 'value' field" }
for msg in messages:
print(msg)
return { 'msg': msg}
Save the file
Deploy the modified template as you did in task 12)
KAFKA_BROKERS=<hosts> KAFKA_TOPIC=<topic> KAFKA_ADMIN_URL=<admin_url> \
MESSAGEHUB_USER=<username> MESSAGEHUB_PASS=<password> \
PACKAGE_NAME=<name> RULE_NAME=<name> TRIGGER_NAME=<name> \
ibmcloud fn deploy -m manifest.yaml
Open an separate command line window and run the standalone sample application to produce messages as outlined in task 8)
Switch back to the command line window you used in task 12) where you deployed the Event Streams template
Poll the activation logs to see whether the incoming message trigger the modified process-message function
ibmcloud fn activation poll
We can see that the prossess-message function is activated each time a message arrives in the topic.
We see the modified process-message function running without error.