Posts

HANA Zeppelin Query Builder with Map Visualization

SAP HANA Query Builder On Apache Zeppelin Demo

HANA Zeppelin Query Builder with Map Visualization

HANA Zeppelin Query Builder with Map Visualization

In working with Apache Zeppelin I found that users wanted a way to explore data and build charts without needing to know SQL right away. This is an attempt to build a note in Zeppelin that would allow a new data scientist to get familiar with the data structure of their database. And it allows them to build simple single table queries that allow for building charts and maps quickly. In addition it shows the SQL used to perform the work.

Demo

This video will demonstrate how it works. I have leveraged work done by Randy Gelhausen’s query builder post on how to make a where clause builder.  I also used Damien Sorel’s jQuery Query Builder. These were used to make a series of paragraphs to lookup tables and columns in HANA and allow the user to build a custom query. This data can be quickly graphed using the Zeppelin Helium visualizations.

The Code

This is for those data scientists and coders that want to replicate this in their Zeppelin.

Note that this code is imperfect as I have not worked out all the issues with it. You may need to make changes to get it to work. It only works on Zeppelin 0.8.0 Snapshot. It is also made to work with SAP HANA as the databases.

It only has one type of aggregation – sum and it does not have a way to perform a having statement. But these features could easily be added.

This Zeppelin note is dependent on code from a previous post. Follow the directions in Using Zeppelin to Explore a Database first.

Paragraph One

%spark
//Get list of columns on a given table
def columns1(table: String) : Array[(String)] = {
 sqlContext.sql("select * from " + table + " limit 0").columns.map(x => x.asInstanceOf[String])
}

def columns(table: String) : Array[(String, String)] = {
 sqlContext.sql("select * from " + table + " limit 0").columns.map(x => (x, x))
}

def number_column_types(table: String) : Array[String] = {
 var columnType = sqlContext.sql("select column_name from table_columns where table_name='" +
    table + "' and data_type_name = 'INTEGER'")
 
 columnType.map {case Row(column_name: String) => (column_name)}.collect()
}

// set up the tables select list
val tables = sqlContext.sql("show tables").collect.map(s=>s(1).asInstanceOf[String].toUpperCase())
z.angularBind("tables", tables)
var sTable ="tables"
z.angularBind("selectedTable", sTable)


z.angularUnwatch("selectedTable")
z.angularWatch("selectedTable", (before:Object, after:Object) => {
 println("running " + after)
 sTable = after.asInstanceOf[String]
 // put the id for paragraph 2 and 3 here
 z.run("20180109-121251_268745664")
 z.run("20180109-132517_167004794")
})


var col = columns1(sTable)
col = col :+ "*"
z.angularBind("columns", col)
// hack to make the where clause work on initial load
var col2 = columns(sTable)
var extra = ("1","1")
col2 = col2 :+ extra
z.angularBind("columns2", col2)
var colTypes = number_column_types(sTable)
z.angularBind("numberColumns", colTypes)
var sColumns = Array("*")
// hack to make the where clause work on initial load
var clause = "1=1"
var countColumn = "*"
var limit = "10"

// setup for the columns select list
z.angularBind("selectedColumns", sColumns)
z.angularUnwatch("selectedColumns")
z.angularWatch("selectedColumns", (before:Object, after:Object) => {
 sColumns = after.asInstanceOf[Array[String]]
 // put the id for paragraph 2 and 3 here
 z.run("20180109-121251_268745664")
 z.run("20180109-132517_167004794")
})
z.angularBind("selectedCount", countColumn)
z.angularUnwatch("selectedCount")
z.angularWatch("selectedCount", (before:Object, after:Object) => {
 countColumn = after.asInstanceOf[String]
})
// bind the where clause
z.angularBind("clause", clause)
z.angularUnwatch("clause")
z.angularWatch("clause", (oldVal, newVal) => {
 clause = newVal.asInstanceOf[String]
})

z.angularBind("limit", limit)
z.angularUnwatch("limit")
z.angularWatch("limit", (oldVal, newVal) => {
 limit = newVal.asInstanceOf[String]
})

This paragraph is Scala code that sets up some functions that are used to query the table with the list of tables and the table with the list of columns. You must have the tables loaded into Spark as views or tables in order to see them in the select lists. This paragraph performs all the binding so that the next paragraph which is Angular code can get the data built here.

Paragraph Two

%angular
<link rel="stylesheet" href="https://cdn.rawgit.com/mistic100/jQuery-QueryBuilder/master/dist/css/query-builder.default.min.css">
<script src="https://cdn.rawgit.com/mistic100/jQuery-QueryBuilder/master/dist/js/query-builder.standalone.min.js"></script>

<script type="text/javascript">
  var button = $('#generateQuery');
  var qb = $('#builder');
  var whereClause = $('#whereClause');
 
  button.click(function(){
    whereClause.val(qb.queryBuilder('getSQL').sql);
    whereClause.trigger('input'); //triggers Angular to detect changed value
  });
 
  // this builds the where statement builder
  var el = angular.element(qb.parent('.ng-scope'));
  angular.element(el).ready(function(){
    var integer_columns = angular.element('#numCol').val()
    //Executes on page-load and on update to 'columns', defined in first snippet
    window.watcher = el.scope().compiledScope.$watch('columns2', function(newVal, oldVal) {
      //Append each column to QueryBuilder's list of filters
      var options = {allowEmpty: true, filters: []}
      $.each(newVal, function(i, v){
        if(integer_columns.split(',').indexOf(v._1) !== -1){
          options.filters.push({id: v._1, type: 'integer'});
        } else if(v._1.indexOf("DATE") !== -1) {
          options.filters.push({id: v._1, type: 'date'})
        } else { 
          options.filters.push({id: v._1, type: 'string'});
        }
      });
      qb.queryBuilder(options);
    });
  });
</script>
<input type="text" ng-model="numberColumns" id="numCol"></input>
<form class="form-inline">
 <div class="form-group">
 Please select table: Select Columns:<br>
 <select size=5 ng-model="selectedTable" ng-options="o as o for o in tables" 
       data-ng-change="z.runParagraph('20180109-151738_134370871')"></select>
 <select size=5 multiple ng-model="selectedColumns" ng-options="o as o for o in columns">
 <option value="*">*</option>
 </select>
 Sum Column:
 <select ng-model="selectedCount" ng-options="o as o for o in columns">
 <option value="*">*</option>
 </select>
 <label for="limitId">Limit: </label> <input type="text" class="form-control" 
       id="limitId" placeholder="Limit Rows" ng-model="limit"></input>
 </div>
</form>
<div id="builder"></div>
<button type="submit" id="generateQuery" class="btn btn-primary" 
       ng-click="z.runParagraph('20180109-132517_167004794')">Run Query</button>
<input id="whereClause" type="text" ng-model="clause" class="hide"></input>

<h3>Query: select {{selectedColumns.toString()}} from {{selectedTable}} where {{clause}} 
   with a sum on: {{selectedCount}} </h3>

Paragraph two uses javascript libraries from jQuery and jQuery Query Builder. In the z.runParagraph  command use the paragraph id from paragraph three.

Paragraph Three

The results of the query show up in this paragraph. Its function is to generate the query and run it for display.

%spark
import scala.collection.mutable.ArrayBuffer

var selected_count_column = z.angular("selectedCount").asInstanceOf[String]
var selected_columns = z.angular("selectedColumns").asInstanceOf[Array[String]]
var limit = z.angular("limit").asInstanceOf[String]
var limit_clause = ""
if (limit != "*") {
 limit_clause = "limit " + limit
}
val countColumn = z.angular("selectedCount")
var selected_columns_n = selected_columns.toBuffer
// remove from list of columns
selected_columns_n -= selected_count_column

if (countColumn != "*") {
 val query = "select "+ selected_columns_n.mkString(",") + ", sum(" + selected_count_column +
     ") "+ selected_count_column +"_SUM from " + z.angular("selectedTable") + " where " + 
      z.angular("clause") + " group by " + selected_columns_n.mkString(",") + " " + 
      limit_clause
 println(query)
 z.show(sqlContext.sql(query))
} else {
 val query2 = "select "+ selected_columns.mkString(",") +" from " + z.angular("selectedTable") + 
      " where " + z.angular("clause") + " " + limit_clause
 println(query2)
 z.show(sqlContext.sql(query2))
}

Now if everything is just right you will be able to query your tables without writing SQL. This is a limited example as I have not provided options for different types of aggregation, advanced grouping or joins for multiple tables.

 

Please follow us on our website at https://volumeintegration.com and on twitter at volumeint.

Volume Analytics Table Explorer - HANA & Zeppelin

Using Zeppelin to Explore a Database

In attempting to use Apache Zeppelin I found it difficult to just explore a new database. This was the situation when connecting SAP HANA database to Apache Zeppelin using the JDBC driver.

So I created a Zeppelin interface that can be used by a person who does not know how to code or use SQL.

This is a note with code in multiple paragraphs that would allow a person to see a list of all the tables in the database and then view the structure of them and look at a sample of the data in each table.

Volume Analytics Table Explorer - HANA & Zeppelin

Volume Analytics Table Explorer – HANA & Zeppelin

When using a standard database with Apache Zeppelin one needs to register each table into Spark so that it can query it and make DataFrames from the native tables. I got around this by allowing the user to choose they tables they want to register into Apache Zeppelin and Spark. This registration involved using the createOrReplaceTempView function on a DataFrame. This allows us to retain the speed of HANA without copying all the data into a Spark table.

The video shows a short demonstration of how this works.

Once tables are registered as Spark views they can be used by all the other notes on the Apache Zeppelin server. This means that other users can leverage the tables without knowing they came from the HANA database.

The code is custom to HANA because of the names of the system tables where it stores the lists of tables and column names. The code also converts HANA specific data types such as ST_POINT to comma delimited strings.

This example of dynamic forms with informed by Data-Driven Dynamic Forms in Apache Zeppelin

Previous posts on Apache Zeppelin and SAP Hana are:

The Code

Be aware this is prototype code that works on Zeppelin 0.8.0 Snapshot which as of today needs to be built from source. It is pre-release.

First Paragraph

In the first paragraph I am loading up the HANA jdbc driver. But you can avoid doing this by adding your jdbc jar to the dependencies section of the interpreter configuration as laid out in How to Use Zeppelin With SAP HANA

%dep
z.reset() 
z.load("/projects/zeppelin/interpreter/jdbc/ngdbc.jar")

Second Paragraph

In the second paragraph we build the Data Frames from tables in HANA that contain the list of tables and columns in the database. This will be used to show the user what tables and columns are available to use for data analysis.

%spark
import org.apache.spark.sql._
val driver ="com.sap.db.jdbc.Driver"
val url="jdbc:sap://120.12.83.105:30015/ffa"
val database = "dbname"
val username = "username"
val password = "password"
// type in the schemas you wish to expose
val tables = """(select * from tables where schema_name in ('FFA', 'SCHEMA_B')) a """
val columns = """(select * from table_columns where schema_name in ('FFA', 'SCHEMA_B')) b """

val jdbcDF = sqlContext.read.format("jdbc").option("driver",driver)
 .option("url",url)
 .option("databaseName", database)
 .option("user", username)
 .option("password",password)
 .option("dbtable", tables).load()
jdbcDF.createOrReplaceTempView("tables")

val jdbcDF2 = sqlContext.read.format("jdbc").option("driver",driver)
 .option("url",url)
 .option("databaseName", database)
 .option("user", username)
 .option("password",password)
 .option("dbtable", columns).load()
jdbcDF2.createOrReplaceTempView("table_columns")

Third Paragraph

The third paragraph contains the functions that will be used in the fourth paragraph that needs to call Spark / Scala functions. These functions will return the column names and types when a table name is given. Also it has the function that will load a HANA table into a Spark table view.

%spark
//Get list of distinct values on a column for given table
def distinctValues(table: String, col: String) : Array[(String, String)] = {
 sqlContext.sql("select distinct " + col + " from " + table + " order by " + col).collect.map(x => (x(0).asInstanceOf[String], x(0).asInstanceOf[String]))
}

def distinctWhere(table: String, col: String, schema: String) : Array[(String, String)] = {
 var results = sqlContext.sql("select distinct " + col + " from " + table + " where schema_name = '" + schema +"' order by " + col)
 results.collect.map(x => (x(0).asInstanceOf[String], x(0).asInstanceOf[String]))
}

//Get list of tables
def tables(): Array[(String, String)] = {
 sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String].toUpperCase(), x(1).asInstanceOf[String].toUpperCase()))
}

//Get list of columns on a given table
def columns(table: String) : Array[(String, String)] = {
 sqlContext.sql("select * from " + table + " limit 0").columns.map(x => (x, x))
}

def hanaColumns(schema: String, table: String): Array[(String, String)] = {
 sqlContext.sql("select column_name, data_type_name from table_columns where schema_name = '"+ schema + "' and table_name = '" + table+"'").collect.map(x => (x(0).asInstanceOf[String], x(1).asInstanceOf[String]))
}

//load table into spark
def loadSparkTable(schema: String, table: String) : Unit = {
  var columns = hanaColumns(schema, table)
  var tableSql = "(select "
  for (c <- columns) {
    // If this column is a geo datatype convert it to a string
    if (c._2 == "ST_POINT" || c._2 == "ST_GEOMETRY") {
      tableSql = tableSql + c._1 + ".st_y()|| ',' || " + c._1 + ".st_x() " + c._1 + ", "
    } else {
      tableSql = tableSql + c._1 + ", "
    }
  }
 tableSql = tableSql.dropRight(2)
 tableSql = tableSql + " from " + schema +"."+table+") " + table

 val jdbcDF4 = sqlContext.read.format("jdbc").option("driver",driver)
  .option("url",url)
  .option("databaseName", "FFA")
  .option("user", username)
  .option("password", password)
  .option("dbtable", tableSql).load()
  jdbcDF4.createOrReplaceTempView(table)
 
}

//Wrapper for printing any DataFrame in Zeppelin table format
def printQueryResultsAsTable(query: String) : Unit = {
 val df = sqlContext.sql(query)
 print("%table\n" + df.columns.mkString("\t") + '\n'+ df.map(x => x.mkString("\t")).collect().mkString("\n")) 
}

def printTableList(): Unit = {
 println(sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String])).mkString("%table\nTables Loaded\n","\n","\n"))
}

// this part keeps a list of the tables that have been registered for reference
val aRDD = sc.parallelize(sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String])))
val aDF = aRDD.toDF()
aDF.registerTempTable("tables_loaded")

Fourth Paragraph

The fourth paragraph contains the Spark code needed to produce the interface with select lists for picking the tables. It uses dynamic forms as described in the Zeppelin documentation and illustrated in more detail by Rander Zander.

%spark
val schema = z.select("Schemas", distinctValues("tables","schema_name")).asInstanceOf[String]
var table = z.select("Tables", distinctWhere("tables", "table_name", schema)).asInstanceOf[String]
val options = Seq(("yes","yes"))
val load = z.checkbox("Register & View Data", options).mkString("")

val query = "select column_name, data_type_name, length, is_nullable, comments from table_columns where schema_name = '" + schema + "' and table_name = '" + table + "' order by position"
val df = sqlContext.sql(query)


if (load == "yes") { 
 if (table != null && !table.isEmpty()) {
   loadSparkTable(schema, table)
   z.run("20180108-113700_1925475075")
 }
}

if (table != null && !table.isEmpty()) {
 println("%html <h1>"+schema)
 println(table + "</h1>")
 z.show(df)
} else {
 println("%html <h1>Pick a Schema and Table</h1>")
}

As the user changes the select lists schema in paragraph 3 will be called and the tables select list will be populated with the new tables. When they select the table the paragraph will refresh with a table containing some of the details about the table columns like the column types and sizes.

When they select the Register and View checkbox the table will get turned into a Spark view and paragraph five will contain the data contents of the table. Note the z.run command. This runs a specific paragraph and you need to put in your own value here. This should be the paragraph id from the next paragraph which is paragraph five.

Paragraph Five

%spark
z.show(sql("select * from " + table +" limit 100"))

The last paragraph will list the first 100 rows from the table that have been selected and has the register and view on.

Slight modifications of this code will allow the same sort of interface to be built for MySQL, Postgres, Oracle, MS-SQL or any other database.

Now go to SAP HANA Query Builder On Apache Zeppelin Demo and you will find code to build a simple query builder note.

Please let us know on twitter, facebook and LinkedIn if this helps you or your find a better way to do this in Zeppelin.

Previous posts on Apache Zeppelin and SAP Hana are:

 

Query of a geographic region.

Zeppelin Maps the Hard Way

In Zeppelin Maps the Easy Way I showed how to add a map to Zeppelin with a Helium module. But what if you do not have access to the Helium NPM server to load in that module? And what if you want to add features to your Leaflet Map that are not supported in the volume-leaflet package?

This will show you how the Angular javascript library will allow you to add a map user interface to a Zeppelin paragraph.

Zeppelin Angular Leaflet Map

Zeppelin Angular Leaflet Map with Markers

First we want to get a map on the screen with markers.

In Zeppelin create a new note.

As was shown in How to Use Zeppelin With SAP HANA we create a separate paragraph to build the database connection. Please substitute in your own database driver and connection string to make it work for other databases. There are other examples where you can pull in data from a csv file and turn it into a table object.

In the next paragraph we place the spark scala code to query the database and build the markers that will be passed to the final paragraph which is built with angular.

The data query paragraph has a basic way to query a bounding box. It just looks for coordinates that are greater and less than the northwest and southeast corners of a bounding box.

var sql1 = "select comments desc, lat, lng from EVENT_VIEW "
if (box.length > 0) {
var coords = box.split(",")
sql1 = sql1 + " where lng > " + coords(0).toFloat + " and lat > " + coords(1).toFloat + " and lng < " + coords(2).toFloat + " and lat < " + coords(3).toFloat
}

var sql = sql1 +" limit 20"
val map_pings = jdbcDF.sqlContext.sql(sql)
z.angularBind("locations", map_pings.collect()) 

The data from this query is used to make the map_pings and bind it to angular so that any angular code can reference it. Zeppelin has the ability to bind data into other languages so it can be used by different paragraphs in the same note. There are samples for other databases, json and csv files at this link.

We do not have access to the Hana proprietary functions because Zeppelin will load the data up in its own table view of the HANA table. We are using the command “createOrReplaceTempView” so that a copy of the data is not made in Zeppelin. It will just pass the data through.

Note that you should set up the HANA jdbc driver as described in How to Use Zeppelin With SAP HANA.

It is best if you set up a dependency to the HANA jdbc jar in the Spark interpreter. Go to the Zeppelin settings menu.

Zeppelin Settings Menu

Zeppelin Settings Menu

Pick the Interpreter and find the Spark section and press edit.

Zeppelin Interpreter Screen

Zeppelin Interpreter Screen

Then add the path you where you have the SAP HANA jdbc driver called ngdbc.jar installed.

Configure HANA jdbc in Spark Interpreter

Configure HANA jdbc in Spark Interpreter

First Paragraph

%spark
import org.apache.spark.sql._
val driver ="com.sap.db.jdbc.Driver"
val url="jdbc:sap://11.1.88.110:30015/tri"
val database   = "database schema"   
val username   = "username for the database"
val password   = "the Password for the database"
val table_view = "event_view"
var box=""
val jdbcDF = sqlContext.read.format("jdbc").option("driver",driver)
                                           .option("url",url)
                                           .option("databaseName", database)
                                           .option("dbtable", "event_view")
                                           .option("user", username)
                                           .option("password",password)
                                           .option("dbtable", table_view).load()
jdbcDF.createOrReplaceTempView("event_view")

Second Paragraph

%spark

var box = "20.214843750000004,1.9332268264771233,42.36328125000001,29.6880527498568";
var sql1 = "select comments desc, lat, lng from EVENT_VIEW "
if (box.length > 0) {
    var coords = box.split(",")
    sql1 = sql1 + " where lng  > " + coords(0).toFloat + " and lat > " +  
        coords(1).toFloat + " and lng < " + coords(2).toFloat + " and lat < " +
        coords(3).toFloat
}
var sql = sql1 +" limit 20" 

val map_pings = jdbcDF.sqlContext.sql(sql)
z.angularBind("locations", map_pings.collect())
z.angularBind("paragraph", z.getInterpreterContext().getParagraphId())
// get the paragraph id of the the angular paragraph and put it below
z.run("20171127-081000_380354042")

Third Paragraph

In the third paragraph we add the angular code with the %angular directive. Note the for each loop section where it builds the markers and adds them to the map.

%angular 
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.css" />
.
<div id="map" style="height: 300px; width: 100%"></div>
<script type="text/javascript">
function initMap() {
    var element = $('#textbox');
    var map = L.map('map').setView([30.00, -30.00], 3);
   
    L.tileLayer('http://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png').addTo(map);
    var geoMarkers = L.layerGroup().addTo(map);
    
    var el = angular.element($('#map').parent('.ng-scope'));
    var $scope = el.scope().compiledScope;
   
    angular.element(el).ready(function() {
        window.locationWatcher = $scope.$watch('locations', function(newValue, oldValue) {
            //geoMarkers.clearLayers();
            angular.forEach(newValue, function(event) {
                if (event)
                  var marker = L.marker([event.values[1], event.values[2]]).bindPopup(event.values[0]).addTo(geoMarkers);
            });
        })
    });
}
if (window.locationWatcher) { window.locationWatcher(); }

// ensure we only load the script once, seems to cause issues otherwise
if (window.L) {
    initMap();
} else {
    console.log('Loading Leaflet library');
    var sc = document.createElement('script');
    sc.type = 'text/javascript';
    sc.src = 'https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.js';
    sc.onerror = function(err) { alert(err); }
    document.getElementsByTagName('head')[0].appendChild(sc);
}
</script>
<p>Testing the Map</p>

<form class="form-inline">
  <div class="form-group">
    <input id="textbox" ng-model="box" data-ng-change="z.runParagraph(paragraph);"></input>
    <label for="paragraphId">Paragraph Id: </label>
    <input type="text" class="form-control" id="paragraphId" placeholder="Paragraph Id ..." ng-model="paragraph"></input>
  </div>
  <button type="submit" class="btn btn-primary" ng-click="z.runParagraph(paragraph)"> Run Paragraph</button>
</form>

Now when you run the three paragraphs in order it should produce a map with markers on it.

The next step is to add a way to query the database by drawing a box on the screen. Into the scala / spark code we add a variable for the bounding box with the z.angularBind() command. Then a watcher is made to see when this variable changes so the new value can be used to run the query.

Modify Second Paragraph

%spark
z.angularBind("box", box)
// Get the bounding box
z.angularWatch("box", (oldValue: Object, newValue: Object) => {
    println(s"value changed from $oldValue to $newValue")
    box = newValue.asInstanceOf[String]
})

var sql1 = "select comments desc, lat, lng from EVENT_VIEW "
if (box.length > 0) {
    var coords = box.split(",")
    sql1 = sql1 + " where lng  > " + coords(0).toFloat + " and lat > " +  coords(1).toFloat + " and lng < " + coords(2).toFloat + " and lat < " +  coords(3).toFloat
}
var sql = sql1 +" limit 20" 

val map_pings = jdbcDF.sqlContext.sql(sql)
z.angularBind("locations", map_pings.collect())
z.angularBind("paragraph", z.getInterpreterContext().getParagraphId())
z.run("20171127-081000_380354042") // put the paragraph id for your angular paragraph here

To the angular section we need to add in an additional leaflet library called leaflet.draw. This is done by adding an additional css link and a javascript script. Then the draw controls are added as shown in the code below.

Modify the Third Paragraph

%angular 
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.css" />
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/leaflet.draw/0.4.13/leaflet.draw.css" />
.
<script src='https://cdnjs.cloudflare.com/ajax/libs/leaflet.draw/0.4.13/leaflet.draw.js'></script>
<div id="map" style="height: 300px; width: 100%"></div>

<script type="text/javascript">
function initMap() {
    var element = $('#textbox');
    var map = L.map('map').setView([30.00, -30.00], 3);
   
    L.tileLayer('http://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png').addTo(map);
    var geoMarkers = L.layerGroup().addTo(map);
    var drawnItems = new L.FeatureGroup();
    
    map.addLayer(drawnItems);
    
    var drawControl = new L.Control.Draw({
        draw: {
             polygon: false,
             marker: false,
             polyline: false
        },
        edit: {
            featureGroup: drawnItems
        }
    });
    map.addControl(drawControl);
    
    map.on('draw:created', function (e) {
        var type = e.layerType;
        var layer = e.layer;
        drawnItems.addLayer(layer);
        element.val(layer.getBounds().toBBoxString());
        map.fitBounds(layer.getBounds());
        window.setTimeout(function(){
           //Triggers Angular to do its thing with changed model values
           element.trigger('input');
        }, 500);
    });
    
    var el = angular.element($('#map').parent('.ng-scope'));
    var $scope = el.scope().compiledScope;
   
    angular.element(el).ready(function() {
        window.locationWatcher = $scope.$watch('locations', function(newValue, oldValue) {
            $scope.latlng = [];
            angular.forEach(newValue, function(event) {
                if (event)
                  var marker = L.marker([event.values[1], event.values[2]]).bindPopup(event.values[0]).addTo(geoMarkers);
                  $scope.latlng.push(L.latLng(event.values[1], event.values[2]));
            });
            var bounds = L.latLngBounds($scope.latlng)
            map.fitBounds(bounds)
        })
    });

}

if (window.locationWatcher) { window.locationWatcher(); }

// ensure we only load the script once, seems to cause issues otherwise
if (window.L) {
    initMap();
} else {
    console.log('Loading Leaflet library');
    var sc = document.createElement('script');
    sc.type = 'text/javascript';
    sc.src = 'https://cdnjs.cloudflare.com/ajax/libs/leaflet/0.7.5/leaflet.js';
    sc.onerror = function(err) { alert(err); }
    document.getElementsByTagName('head')[0].appendChild(sc);
    s2.onload = initMap;
}
</script>
<p>Testing the Map</p>

<form class="form-inline">
  <div class="form-group">
    <input id="textbox" ng-model="box" data-ng-change="z.runParagraph(paragraph);"></input>
    <label for="paragraphId">Paragraph Id: </label>
    <input type="text" class="form-control" id="paragraphId" placeholder="Paragraph Id ..." ng-model="paragraph"></input>
  </div>
  <button type="submit" class="btn btn-primary" ng-click="z.runParagraph(paragraph)"> Run Paragraph</button>
</form>

There are some important features to mention here that took some investigation to figure out.

Within Zeppelin I was unable to get the box being drawn to be visible. So instead drawing a box will the map to zoom to the area selected by utilizing this code:
element.val(layer.getBounds().toBBoxString());
map.fitBounds(layer.getBounds());

To make the map zoom back to the area after the query is run this code is triggered.

$scope.latlng.push(L.latLng(event.values[1], event.values[2]))
...
var bounds = L.latLngBounds($scope.latlng)
map.fitBounds(bounds)

To trigger the spark / scala paragraph to run after drawing a box this code causes it to run the query paragraph: data-ng-change=”z.runParagraph(paragraph_id);”

<input id="textbox" ng-model="box" data-ng-change="z.runParagraph(paragraph);"></input>

The html form at the bottom is what holds and binds the data back and forth between the paragraphs. It is visible for debugging at the moment.

Query of a geographic region with Zeppelin

Query of a geographic region

Please let us know how it works out for you. Hopefully this will help you add maps to your Zeppelin notebook. I am sure there are many other better ways to accomplish this feature set but this is the first way I was able to get it all to work together.

Demo of the interface:

You can contact us using twitter at @volumeint.

Some code borrowed from: https://gist.github.com/granturing/a09aed4a302a7367be92 and https://zeppelin.apache.org/docs/latest/displaysystem/front-end-angular.html

Ryft-ONE

Ryft and Apache Zeppelin

Ryft

Ryft is an FPGA – (field programmable gate array) appliance that allows for hosting and searching data quickly. In this post I will show one way to connect up Apache Zeppelin for use in data analysis using Scala code. Previously I showed how to connect Apache Zeppelin to SAP Hana.

The Ryft can quickly search structured and unstructured data without needing to build an index. This ability is attributed the the FPGA that can filter data on demand. It uses the internal 4 FPGA modules to process the data at search time. Other types of search systems like ElasticSearch, solr, Lucine or a database have to build and store an index of the data. Ryft operates without an index.

Ryft Speed Comparison

Ryft Speed Comparison

I have populated my Ryft with a cache of data from Enron. It is a dump of Enron emails obtained from Carnegie Mellon. This was as simple as uploading files to the Ryft and running a command like this:

ryftutil -copy “enron*” -c enron_email -a ryft.volumeintegration.com:8765

In the Zeppelin interface I will be able to search for keywords or phrases in the email files and display them. The size of the enron e-mail archive is 20 megabytes.

Ryft One Appliance

Ryft One Appliance

Apache Zeppelin

Apache Zeppelin is an open source web notebook that allows a person to write code in many languages to manipulate and visualize data.

Apache Zeppelin with Volume Analytics Interface

Apache Zeppelin with Volume Analytics Interface

To Apache Zeppelin work with Ryft I installed Apache Zeppelin onto the Ryft appliance and connected the Spark Ryft Connector jar found at this git project. Or download a prebuilt jar.

Follow the directions provided at the spark-ryft-connector project to compile the jar file needed. I compiled the jar file on my local desktop computer. Place the spark-ryft-connector jar file onto the Ryft machine. I did run into one that was not documented; the ryft connector was not working properly. It gives the error: “java.lang.NoClassDefFoundError: org/apache/spark/Logging”

I resolved the issue by downloading spark-core_2.11-1.5.2.logging.jar from  https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar and put it in zeppelin/interpreter/spark/dep directory and that resoved the issue.

Now you can create a note in Zeppelin. I am using the Spark interpreter which allows you to write the code in Scala.

First you have to make sure Zeppelin can use the ryft code in the jar file. Make a dependency paragraph with this code:

%dep
z.reset()
z.load("/home/ryftuser/spark-ryft-connector-2.10.6-0.9.0.jar")

Ryft Query

Now make a new paragraph with the code to make form fields and run the Ryft API commands to perform a search. Figuring these queries out takes a detailed study of the documentation.

These are the commands to prepare and run the query. I show a simple search, a fuzzy hamming search and a fuzzy edit distance search. The Ryft can perform very fast fuzzy searches with wide edit distances because there is not an index being built.

Simple Query
queryOptions = RyftQueryOptions("enron_email", "line", 0 toByte)
query = SimpleQuery(searchFor.toString)
Hamming Query
queryOptions = RyftQueryOptions("enron_email", surrounding.toString.toInt, distance.toString.toByte, fhs)
Edit Distance Query
queryOptions = RyftQueryOptions("enron_email", "line", distance.toString.toByte)
The Search
var searchRDD = sc.ryftRDD(Seq(query), queryOptions)

This produces an RDD that can be manipulated to view the contents using code like the example below.

searchRDD.asInstanceOf[RyftRDD[RyftData]].collect.foreach { ryftData =>
   println(ryftData.offset)
   println(ryftData.length)
   println(ryftData.fuzziness)
   println(ryftData.data.replace("\n", " "))
   println(ryftData.file)
}

The Result in Zeppelin

Result of Searching Ryft with Zeppelin

Result of Searching Ryft with Zeppelin

In addition I have included code that allows the user to click on Show File to see the original e-mail with the relevant text highlighted in bold.

Results in BoldI installed Apache Zeppelin in a way that allows it access to a portion of the file system on the server where I stored the original copy of the email files.

In order for Apache Zeppelin to display the original email, I had to give it access to the part of the filesystem where the original emails were stored.  Ryft uses a catalog of the emails to perform searches, as it performs better when searching fewer larger files than more smaller ones. The catalog feature allows it to combine many small files into one large file.

The search results return a filename and offset which Apache Zeppelin uses to retrieve the relevant file and highlight the appropriate match. 

In the end results Ryft found all instances of the name Mohammad with various spelling differences in 0.148 seconds in a dataset of 30 megabytes. When I performed the same search terms on 48 gigabytes of data it ran the search in 5.89 seconds. And 94 gigabytes took 12.274 seconds, 102 gigabytes took 13 seconds. These are just quick sample numbers using dumps of many files. Perhaps performance could be improved by consolidating small files into catalogs.

Zeppelin Editor

The code is edited in Zeppelin itself.

Code in Zeppelin

Code in Zeppelin

You edit the code in the web interface but it can hide it once you have the form fields working. Here is the part of the code that produces the form fields:

 val searchFor = z.input("Search String", "mohammad")
 val distance = z.input("Search Distance", 2)
 var queryType = z.select("Query Type", Seq(("1","Simple"),("2","Hamming"),("3","Edit Distance"))).toString
 var surrounding = z.input("Surrounding", "line")

So in the end we end up with the following code.

%spark
import com.ryft.spark.connector._
import com.ryft.spark.connector.domain.RyftQueryOptions
import com.ryft.spark.connector.query.SimpleQuery
import com.ryft.spark.connector.query.value.{EditValue, HammingValue}
import com.ryft.spark.connector.rdd.RyftRDD
import com.ryft.spark.connector.domain.{fhs, RyftData, RyftQueryOptions}
import scala.language.postfixOps
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import scala.io.Source

def isEmpty(x: String) = x == null || x.isEmpty
  var queryOptions = RyftQueryOptions("enron_email", "line", 0 toByte)
  val searchFor = z.input("Search String", "mohammad")
  val distance = z.input("Search Distance", 2)
  var queryType = z.select("Query Type",("2","Hamming"), Seq(("1","Simple"),("2","Hamming"),("3","Edit Distance"))).toString
  var surrounding = z.input("Surrounding", "line")
  var query = SimpleQuery(searchFor.toString)

 
  if (isEmpty(queryType)) {
      queryType = "2"
  }

  if (queryType.toString.toInt == 1) {
        println("simple")
        if (surrounding == "line") {
            queryOptions = RyftQueryOptions("enron_email", "line", 0 toByte)
        } else {
            queryOptions = RyftQueryOptions("enron_email", surrounding.toString.toInt, 0 toByte)
        }
        query = SimpleQuery(searchFor.toString)

  } else if (queryType.toString.toInt ==2) {
        println("hamming")
        if (surrounding == "line") {
            queryOptions = RyftQueryOptions("enron_email", "line", distance.toString.toByte, fhs)
        } else {
            queryOptions = RyftQueryOptions("enron_email", surrounding.toString.toInt, distance.toString.toByte, fhs)
        }
  } else {
        println("edit")
        if (surrounding == "line") {
            queryOptions = RyftQueryOptions("enron_email", "line", distance.toString.toByte)
        } else {
            queryOptions = RyftQueryOptions("enron_email", surrounding.toString.toInt, distance.toString.toByte)
        }
  }

  var searchRDD = sc.ryftRDD(Seq(query), queryOptions)
  var count = searchRDD.count()

  print(s"%html <h2>Count: $count</h2>")

  if (count > 0) {
        println(s"Hamming search RDD first: ${searchRDD.first()}")
        println(searchRDD.count())
        print("%html <table>")
        print("<script>")
        println("function showhide(id) { var e = document.getElementById(id); e.style.display = (e.style.display == 'block') ? 'none' : 'block';}")
        print("</script>")
        print("<tr><td>File</td><td>Data</td></tr>")

        searchRDD.asInstanceOf[RyftRDD[RyftData]].collect.foreach { ryftData =>
            print("<tr><td style='width:600px'><a href=javascript:showhide('"+ryftData.file+"')>Show File </a></td>")
            val x = ryftData.data.replace("\n", " ")
            print(s"<td> $x</td></tr>")
            println("<tr id="+ ryftData.file +" style='display:none;'>")
            println("<td style='width:600px'>")

            val source = Source.fromFile("/home/ryftuser/maildir/"+ryftData.file)
            var theFile = try source.mkString finally source.close()
            var newDoc = ""
            var totalCharCount = 0
            var charCount = 0
            for (c <- theFile) {
                charCount = charCount + 1
                if (totalCharCount + charCount == ryftData.offset) {
                    newDoc = newDoc+"<b>"
                } else if (totalCharCount+charCount == ryftData.offset+ryftData.length+1) {
                    newDoc = newDoc+"</b>"
                }
                newDoc = newDoc+c
            }
            print(newDoc.replace("\n", "<br>"))
            totalCharCount = totalCharCount + charCount
            println("</td>")
            println("</tr>")
        }
        print("</table>")
    }

So this should get you started on being able to search data with Zeppelin and Ryft. YOu can use this interface to experiment with the different edit distances and search queries the Ryft supports. You can also implement additional methods to search by RegEx, IP addresses, dates and currency.

Please follow us on Facebook and on twitter at volumeint.