Streaming near real time


#1

I was reading about the release notes for the 1.11 version and I one of the coolest thing I can’t wait to put my hands on is the streaming feature. I always wanted to do something with that. But there’s something I didn’t fully get: , “…streaming data in near real-time across all data providers”.

I mean what is “near real-time” for you? Sounds like streaming with some delay but that is still real time.


#2

That’s right, our new time slider component, introduced with our 1.11 release, includes a player with a “live” mode feature. When this mode is selected, the data player keeps moving into current time after it’s done playing historical data. It is “near real-time” because it uses a pull strategy to bring the latest as data keeps landing on a store such as Elasticsearch. The refresh interval, which is configurable, determines how soon new data is reflected in the visualizations, by default 1 second.

On the other hand, we call our ksql data provider “real-time” because it is event-driven, events are reflected in the visualizations as they arrive. ChartFactor allows users to visualize streaming data using these two approaches.


#3

Hi, guys, there is a demo of this feature that we can setup easily. Thanks.


#4

Hi @emma84, we do not have at the moment a public demo for the streaming functionality, but in the future we will add it, for now, you can simulate a “near real-time” data, indexing manually documents to elasticsearch at custom time intervals and configure the Time Slider to play the data. For example, let see the how could be this idea “at big steps” in a python script:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, streaming_bulk
import time
import datetime

es = Elasticsearch()
index_name = 'myIndexName'

def getMappings():
  mappings = {
          "transactions": {
            "properties": {
              "timestamp": {
                "type": "date",
                "format": "yyyy-MM-ddThh:mm:ss.SSSSSS"
              },
              ...
           }
        }
     }
    return mappings
}

def createIndex(client):
  doc = {
      "settings" : {
          "number_of_shards" : 1
      },
      "mappings" : getMappings()
  }
  client.indices.create(index=index_name, ignore=[400, 404], body=doc)
}

def create_doc(r, time):
  return {
    "timestamp": time.isoformat(),
    ...
 }

create_index(es)
count = 0
while True:
  now = datetime.datetime.now()
  doc = create_doc(rowData, now)
  es.index(index=index_name, id=count, doc_type='transactions', body=doc)
  count += 1
  time.sleep(yourCustomInreval)

Now you can create your Time Slider component with a similar configuration:

let field = cf.Attribute("timestamp").func("SECOND");

cf.provider("YourDataProvider")
  .source('YourDataSource')
  .timeField(field)
  .graph("Time Slider")
  .set("player", {
        'step-unit': 'day',
        'step': 1,
        'refresh': 2,
        'pin-left': true,
        'live': true,
    		'autoplay': true,
        })
  .execute();

Note that this code is not intended to be executable, is just a guide. Let me know if something is not clear or not work in your implementation. Best regards.


#5

Hi @dario, thanks for the guide, but I am little bit new to elastic and python and a couple of questions pass thru my mind at this moment. The first one is about the initialization of elasticsearch itself, do I need a custom configuration? There is no connection string on the python code, how this code knows where the elasticsearch instance is? By the three dots on the python script I infer I need to complete it with other mappings (a kind of data schema ?), and on the create_doc function, what is exactly the r argument? Sorry if it is too obvious, but if you could answer these questions and point me to some extra resources I will appreciate it. Thanks.


#6

Hi @emma84, if you feel more comfortable with other programming languages elasticsearch should have drivers or clients for them. No matter, if you use python or not, the first thing that you need to do is configure your elasticsearch installation. In the elasticsearch.yml file enable CORS:

http.cors.enabled: true
http.cors.allow-origin: "*"

and is if you want access elasticsearch from outside the local machine is:

network.bind_host: 0.0.0.0
network.host: 0.0.0.0

You can see more details for the elasticsearh setup in https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html

After that if you want to use python as the streaming generator you have to install the elasticsearch client, with pip is as easy as:

pip install elasticsearch

And then complete the template that I provide on the previous post, as you understand the three dots have to be completed with your own mapping depending on the data that you go to store and the same thing in the create_doc function that you has to assign a value for each one of the mapping entries that you define, the r parameter could be the row data object that goes to be used as a data source to create the document that you go to create.

You can see more details for the python client for elasticsearch in https://elasticsearch-py.readthedocs.io/en/master/

Let me know if you have other questions. Best regards.