A Volume Analytics Flow for Finding Social Media Bots

Volume Analytics Chaos Control

Volume Analytics Chaos Control

Volume Analytics is a software tool used to build, deploy and manage data processing applications.

Volume Analytics is a scalable data management platform that allows the rapid ingest, transformation, and loading high volumes of data into multiple analytic models as defined by your requirements or your existing data models.

Volume Analytics is a platform for streaming large volumes of varied data at high velocity.​

Volume Analytics is a tool that both enables rapid software development and operational maintainability with scalability for high data volumes. Volume Analytics can be used for all of your data mining, fusion, extraction, transform and loading needs. Volume Analytics has been used to mine and analyze social media feeds, monitor and alert on insider threats and automate the search for cyber threats. In addition it is being used to consolidate data from many data sources (databases, HDFS, file systems, data lakes) and producing multiple data models for multiple data analytics visualization tools. It could also be used to consolidate sensor data from IoT devices or monitor a SCADA industrial control network.

Volume Analytics easily facilitates a way to quickly develop highly redundant software that’s both scalable and maintainable. In the end you save money on labor for development and maintenance of systems built with Volume Analytics.

In other words Volume Analytics provides the plumbing of a data processing system. The application you are building has distinct units of work that need to be done. We might compare it to a water treatment plant. Dirty water comes in to the system in a pipe and comes to a large contaminate filter. The filter is a work task and the pipe is a topic. Together they make a flow.

After the first filter another pipe carries the water minus the dirt to another water purification worker. In the water plant there is a dashboard for the managers to monitor the system to see if they need to fix something or add more pipes and cleaning tasks to the system.

Volume Analytics provides the pipes, a platform to run the worker tasks and a management tool to control the flow of data through the system.

A Volume Analytics Flow for Finding Social Media Bots

A Volume Analytics Flow for Finding Social Media Bots

In addition Volume Analytics has redundancy for disaster recovery, high availability and parallel processing. This is where our analogy fails. Data is duplicated across multiple topics. The failure of a particular topic (pipe)  does not destroy any data because it is preserved on another topic. Topics are optimally setup in multiple data centers to maintain high availability.

In Volume Analytics the water filter tasks in the analogy are called tasks. Tasks are groups of code that perform some unit of work. Your specific application will have its own tasks. The tasks are deployed on more than one server in more than one data center.

Benefits

Faster start up time saves money and time.

Volume Analytics allows a faster start up time for a new application or system being built. The team does not need to build the platform that moves the data to tasks. They do not need to build a monitoring system as those features are included. However, Volume Analytics will integrate with your current monitoring systems.

System is down less often

The DevOps team gets visibility into the system out of the box. They do not have to stand up a log search system. So it saves time. They can see what is going on and fix it quickly.

Plan for Growth

As your data grows and the system needs to process more data Volume Analytics grows. Add server instances to increase the processing power.  As work grows Volume Analytics allocates work to new instances. There is no re-coding needed. Save time and money as developers are not needed to re-implement the code to work at a larger scale.

Less Disruptive deployments

Construct your application in a way that allows for deployments of new features with a lower impact on features in production. New code libraries and modules can be deployed to the platform and allowed to interact with the already running parts of the system without an outage. A built in code library repository is included.

In addition currently running flows can be terminated while the data waits on the topics for the newly programmed flow to be started.

This Flow processes files to find IP addresses, searches multiple APIs for matches and inserts data into a HANA database

This Flow processes files to find IP addresses, searches multiple APIs for matches and inserts data into a HANA database

A data processing search threats flow in production. Each of the boxes is a task that performs a unit of work. The task puts the processed data on the topic represented by the star. Then the next task picks up the data and does another part of the job. The combination of a set of tasks and topics is a flow.

Geolocate IP Flow

Geolocate IP Flow

Additional flow to geolocate IP addresses added as the first flow is running.

Combined Flows

Combined Flows

The combination of flows working together. The topic ip4-topic is an integration point.

Modular

Volume Analytics is modular and tasks are reusable. You can reconfigure your data processing pipeline without introducing new code. You can use tasks in more than one application.

Highly Available

Out of the box, Volume Analytics highly available due to its built in redundancy. Work tasks and topics (pipes) run in triplicate. As long as your compute instances are in multiple data centers you will have redundancy built in. Volume Analytics knows how to balance the data between duplicate and avoid data loss if one or more work tasks fail — this extends to the concept of queuing up work if all work tasks fail.

Integration

Volume Analytics integrates with other products. It can retrieve and save data to other systems like topics, queues, databases, file systems and data stores. In addition these integrations happen over encrypted channels.

In our sample application CyberFlow there are many tasks that integrate with other systems. The read bucket task reads files from an AWS S3 bucket, the ThreatCrowd is an API call to https://www.threatcrowd.org and Honeypot calls to https://www.projecthoneypot.org. Then the insert tasks integrate to the SAP HANA database used in this example.

Volume Analytics integrates with your enterprise authentication and authorizations systems like LDAP, ActiveDirectory, CAP and more.

Data Management

Ingests datasets from throughout the enterprise, tracking each delivery and routing it through Volume Analytics to extract the greatest benefit. Shares common capabilities such as text extraction, sentiment analysis, categorization, and indexing. A series of services make those datasets discoverable and available to authorized users and other downstream systems.

Data Analytics

In addition, to the management console Volume Analytics comes with an notebook application. This allows a data scientist or analyst to discover and convert data into information on reports. After your data is processed by Volume Analytics and put into a database the Notebook can be used to visualize the data. The data is sliced and diced and displayed on graphs, charts and maps.

Volume Analytics Notebook

Flow Control Panel

Topic Control Panel

The Flow control panel allows for control and basic monitoring of flows. Flows are groupings of tasks and topics working together. You can stop, start and terminate flows. Launch additional flow virtual machines when there is heavy load of data processing work from this screen. The panel also gives access to start up extra worker tasks as needed. There is also a link that will allow you to analyze the logs in Kibana

Topic Control Panel

Topic Control Panel

The topic control panel allows for the control and monitoring of topics. Monitor and delete topics  from here.

Consumer Monitor Panel

Consumer Monitor Panel

The consumer monitor panel allows for the monitoring of consumer tasks. Consumer tasks are the tasks that read from a topic. They may also write to a topic. This screen will allow you to monitor that the messages are being processed and determine if there is a lag in the processing.

Volume Analytics is used by our customers to process data from many data streams and data sources quickly and reliably. In addition, it has enabled the production of prototype systems that scale up into enterprise systems without rebuilding and re-coding the entire system.

And now this tour of Volume Analytics leads into a video demonstration of how it all works together.

Demonstration Video

This video will further describe the features of Volume Analytics using an example application which parses ip addresses out of incident reports and searches other systems for indications of those IP addresses. The data is saved into a SAP HANA database.

Request a Demo Today

Volume Analytics is scalable, fast, maintainable and repeatable. Contact us to request a free demo and experience the power and efficiency of Volume Analytics today.

Contact

Volume Analytics Table Explorer - HANA & Zeppelin

Using Zeppelin to Explore a Database

In attempting to use Apache Zeppelin I found it difficult to just explore a new database. This was the situation when connecting SAP HANA database to Apache Zeppelin using the JDBC driver.

So I created a Zeppelin interface that can be used by a person who does not know how to code or use SQL.

This is a note with code in multiple paragraphs that would allow a person to see a list of all the tables in the database and then view the structure of them and look at a sample of the data in each table.

Volume Analytics Table Explorer - HANA & Zeppelin

Volume Analytics Table Explorer – HANA & Zeppelin

When using a standard database with Apache Zeppelin one needs to register each table into Spark so that it can query it and make DataFrames from the native tables. I got around this by allowing the user to choose they tables they want to register into Apache Zeppelin and Spark. This registration involved using the createOrReplaceTempView function on a DataFrame. This allows us to retain the speed of HANA without copying all the data into a Spark table.

The video shows a short demonstration of how this works.

Once tables are registered as Spark views they can be used by all the other notes on the Apache Zeppelin server. This means that other users can leverage the tables without knowing they came from the HANA database.

The code is custom to HANA because of the names of the system tables where it stores the lists of tables and column names. The code also converts HANA specific data types such as ST_POINT to comma delimited strings.

This example of dynamic forms with informed by Data-Driven Dynamic Forms in Apache Zeppelin

Previous posts on Apache Zeppelin and SAP Hana are:

The Code

Be aware this is prototype code that works on Zeppelin 0.8.0 Snapshot which as of today needs to be built from source. It is pre-release.

First Paragraph

In the first paragraph I am loading up the HANA jdbc driver. But you can avoid doing this by adding your jdbc jar to the dependencies section of the interpreter configuration as laid out in How to Use Zeppelin With SAP HANA

%dep
z.reset() 
z.load("/projects/zeppelin/interpreter/jdbc/ngdbc.jar")

Second Paragraph

In the second paragraph we build the Data Frames from tables in HANA that contain the list of tables and columns in the database. This will be used to show the user what tables and columns are available to use for data analysis.

%spark
import org.apache.spark.sql._
val driver ="com.sap.db.jdbc.Driver"
val url="jdbc:sap://120.12.83.105:30015/ffa"
val database = "dbname"
val username = "username"
val password = "password"
// type in the schemas you wish to expose
val tables = """(select * from tables where schema_name in ('FFA', 'SCHEMA_B')) a """
val columns = """(select * from table_columns where schema_name in ('FFA', 'SCHEMA_B')) b """

val jdbcDF = sqlContext.read.format("jdbc").option("driver",driver)
 .option("url",url)
 .option("databaseName", database)
 .option("user", username)
 .option("password",password)
 .option("dbtable", tables).load()
jdbcDF.createOrReplaceTempView("tables")

val jdbcDF2 = sqlContext.read.format("jdbc").option("driver",driver)
 .option("url",url)
 .option("databaseName", database)
 .option("user", username)
 .option("password",password)
 .option("dbtable", columns).load()
jdbcDF2.createOrReplaceTempView("table_columns")

Third Paragraph

The third paragraph contains the functions that will be used in the fourth paragraph that needs to call Spark / Scala functions. These functions will return the column names and types when a table name is given. Also it has the function that will load a HANA table into a Spark table view.

%spark
//Get list of distinct values on a column for given table
def distinctValues(table: String, col: String) : Array[(String, String)] = {
 sqlContext.sql("select distinct " + col + " from " + table + " order by " + col).collect.map(x => (x(0).asInstanceOf[String], x(0).asInstanceOf[String]))
}

def distinctWhere(table: String, col: String, schema: String) : Array[(String, String)] = {
 var results = sqlContext.sql("select distinct " + col + " from " + table + " where schema_name = '" + schema +"' order by " + col)
 results.collect.map(x => (x(0).asInstanceOf[String], x(0).asInstanceOf[String]))
}

//Get list of tables
def tables(): Array[(String, String)] = {
 sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String].toUpperCase(), x(1).asInstanceOf[String].toUpperCase()))
}

//Get list of columns on a given table
def columns(table: String) : Array[(String, String)] = {
 sqlContext.sql("select * from " + table + " limit 0").columns.map(x => (x, x))
}

def hanaColumns(schema: String, table: String): Array[(String, String)] = {
 sqlContext.sql("select column_name, data_type_name from table_columns where schema_name = '"+ schema + "' and table_name = '" + table+"'").collect.map(x => (x(0).asInstanceOf[String], x(1).asInstanceOf[String]))
}

//load table into spark
def loadSparkTable(schema: String, table: String) : Unit = {
  var columns = hanaColumns(schema, table)
  var tableSql = "(select "
  for (c <- columns) {
    // If this column is a geo datatype convert it to a string
    if (c._2 == "ST_POINT" || c._2 == "ST_GEOMETRY") {
      tableSql = tableSql + c._1 + ".st_y()|| ',' || " + c._1 + ".st_x() " + c._1 + ", "
    } else {
      tableSql = tableSql + c._1 + ", "
    }
  }
 tableSql = tableSql.dropRight(2)
 tableSql = tableSql + " from " + schema +"."+table+") " + table

 val jdbcDF4 = sqlContext.read.format("jdbc").option("driver",driver)
  .option("url",url)
  .option("databaseName", "FFA")
  .option("user", username)
  .option("password", password)
  .option("dbtable", tableSql).load()
  jdbcDF4.createOrReplaceTempView(table)
 
}

//Wrapper for printing any DataFrame in Zeppelin table format
def printQueryResultsAsTable(query: String) : Unit = {
 val df = sqlContext.sql(query)
 print("%table\n" + df.columns.mkString("\t") + '\n'+ df.map(x => x.mkString("\t")).collect().mkString("\n")) 
}

def printTableList(): Unit = {
 println(sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String])).mkString("%table\nTables Loaded\n","\n","\n"))
}

// this part keeps a list of the tables that have been registered for reference
val aRDD = sc.parallelize(sqlContext.sql("show tables").collect.map(x => (x(1).asInstanceOf[String])))
val aDF = aRDD.toDF()
aDF.registerTempTable("tables_loaded")

Fourth Paragraph

The fourth paragraph contains the Spark code needed to produce the interface with select lists for picking the tables. It uses dynamic forms as described in the Zeppelin documentation and illustrated in more detail by Rander Zander.

%spark
val schema = z.select("Schemas", distinctValues("tables","schema_name")).asInstanceOf[String]
var table = z.select("Tables", distinctWhere("tables", "table_name", schema)).asInstanceOf[String]
val options = Seq(("yes","yes"))
val load = z.checkbox("Register & View Data", options).mkString("")

val query = "select column_name, data_type_name, length, is_nullable, comments from table_columns where schema_name = '" + schema + "' and table_name = '" + table + "' order by position"
val df = sqlContext.sql(query)


if (load == "yes") { 
 if (table != null && !table.isEmpty()) {
   loadSparkTable(schema, table)
   z.run("20180108-113700_1925475075")
 }
}

if (table != null && !table.isEmpty()) {
 println("%html <h1>"+schema)
 println(table + "</h1>")
 z.show(df)
} else {
 println("%html <h1>Pick a Schema and Table</h1>")
}

As the user changes the select lists schema in paragraph 3 will be called and the tables select list will be populated with the new tables. When they select the table the paragraph will refresh with a table containing some of the details about the table columns like the column types and sizes.

When they select the Register and View checkbox the table will get turned into a Spark view and paragraph five will contain the data contents of the table. Note the z.run command. This runs a specific paragraph and you need to put in your own value here. This should be the paragraph id from the next paragraph which is paragraph five.

Paragraph Five

%spark
z.show(sql("select * from " + table +" limit 100"))

The last paragraph will list the first 100 rows from the table that have been selected and has the register and view on.

Slight modifications of this code will allow the same sort of interface to be built for MySQL, Postgres, Oracle, MS-SQL or any other database.

Now go to SAP HANA Query Builder On Apache Zeppelin Demo and you will find code to build a simple query builder note.

Please let us know on twitter, facebook and LinkedIn if this helps you or your find a better way to do this in Zeppelin.

Previous posts on Apache Zeppelin and SAP Hana are:

 

Digital Cherries by Bradley Johnson

JSON Joins - jq

Like a bunch of json objects being manipulated. Digital Cherries by Bradley Johnson

Digital Cherries by Bradley Johnson https://art.wowak.com like little JSON objects

Manipulation of JSON files is an interesting challenge but it is much easier than trying to manipulate XML files. I was given two files that were lists of JSON objects. The two files had a common key that could be used to join the files together called business_id. The object was to flatten out the two files into one file.

Once they are in one large file we will be using a system called Ryft to search the data. Ryft is an FPGA (Field Programmable Gate Array) system that searches data very quickly with special capabilities for performing fuzzy searches with a large edit distance. I am hypothesizing that the Ryft will work better on flat data instead of needing to perform a join between two tables.

The files are from the Yelp dataset challenge. We will use the program called jq to join the data. The data files are yelp_business.json and yelp_review.json

The file yelp_business.json has this format below and each record / object is separated by a hard return. This file is 74 Megabytes. Note: I added hard returns between the fields to make it easier to read.
{
"business_id": "UsFtqoBl7naz8AVUBZMjQQ",
"full_address": "202 McClure St\nDravosburg, PA 15034",
"hours": {},
"open": true,
"categories": ["Nightlife"],
"city": "Dravosburg",
"review_count": 5,
"name": "Clancy's Pub",
"neighborhoods": [],
"longitude": -79.8868138,
"state": "PA",
"stars": 3.0,
"latitude": 40.3505527,
"attributes": {"Happy Hour": true, "Accepts Credit Cards": true, "Good For Groups": true, "Outdoor Seating": false, "Price Range": 1},
"type": "business"
}

The file yelp_review.json has this format below and is also separated by hard returns. Now for each business there are many reviews. So if a business has five reviews the final output will contain five rows of data for that one business. This file is 2.1 Gigabytes

{
"votes": {"funny": 0, "useful": 0, "cool": 0},
"user_id": "uK8tzraOp4M5u3uYrqIBXg",
"review_id": "Di3exaUCFNw1V4kSNW5pgA",
"stars": 5,
"date": "2013-11-08",
"text": "All the food is great here. But the best thing they have is their wings. Their wings are simply fantastic!! The \"Wet Cajun\" are by the best & most popular. I also like the seasoned salt wings. Wing Night is Monday & Wednesday night, $0.75 whole wings!\n\nThe dining area is nice. Very family friendly! The bar is very nice is well. This place is truly a Yinzer's dream!! \"Pittsburgh Dad\" would love this place n'at!!",
"type": "review",
"business_id": "UsFtqoBl7naz8AVUBZMjQQ"
}

Notice that “business_id” is in both objects. This is the field we wish to join with.

Originally I started with this answer on OpenStack.

So what we really want is a left join of all the data like a database would perform.

The jq software one to read functions out of a file to make the command line easier. With help from pkoppstein at the jq github we have a file called leftJoin.jq.

# leftJoin(a1; a2; field) expects a1 and a2 to be arrays of JSON objects
# and that for each of the objects, the field value is a string.
# A left join is performed on "field".
def leftJoin(a1; a2; field):
# hash phase:
(reduce a2[] as $o ({}; . + { ($o | field): $o } )) as $h2
# join phase:
| reduce a1[] as $o ([]; . + [$h2[$o | field] + $o ])|.[];

leftJoin( $file2; $file1; .business_id)

Based on this code the last line is what passes in the variables for the file names and sets the key used to join to be business_id. With in the reduce commands it is turning the lists of objects into json arrays and then finding the field of business_id and concatenating the two json objects together with the (+) plus sign. The final command “|.[]” at the end where the semicolon finalizes the command is used to turn the json array back into a stream of json list objects. The Ryft appliance only reads in json as lists of objects.

If there are any fields that are identically named the jq code will use the one from file2. So because both files have the field of “type” the new data file will be type = review.

To run this we use the command line of:
jq -nc --slurpfile file1 yelp_business.json --slurpfile file2 yelp_review.json -f leftJoin.jq > yelp_bus_review.json

As a result this command takes the two files and passes them to the jq commands to do the work of joining them together. It will write out a new file called yelp_bus_review.json. It may take a long time to run depending on the size of your files but I end up with a 4.8 Gigabyte file when finished. Here are two rows in the new file:

{"business_id":"UsFtqoBl7naz8AVUBZMjQQ","full_address":"202 McClure St\nDravosburg, PA 15034","hours":{},"open":true,"categories":["Nightlife"],"city":"Dravosburg","review_count":5,"name":"Clancy's Pub","neighborhoods":[],"longitude":-79.8868138,"state":"PA","stars":4,"latitude":40.3505527,"attributes":{"Happy Hour":true,"Accepts Credit Cards":true,"Good For Groups":true,"Outdoor Seating":false,"Price Range":1},"type":"review","votes":{"funny":0,"useful":0,"cool":0},"user_id":"JPPhyFE-UE453zA6K0TVgw","review_id":"mjCJR33jvUNt41iJCxDU_g","date":"2014-11-28","text":"Cold cheap beer. Good bar food. Good service. \n\nLooking for a great Pittsburgh style fish sandwich, this is the place to go. The breading is light, fish is more than plentiful and a good side of home cut fries. \n\nGood grilled chicken salads or steak. Soup of day is homemade and lots of specials. Great place for lunch or bar snacks and beer."}
{"business_id":"UsFtqoBl7naz8AVUBZMjQQ","full_address":"202 McClure St\nDravosburg, PA 15034","hours":{},"open":true,"categories":["Nightlife"],"city":"Dravosburg","review_count":5,"name":"Clancy's Pub","neighborhoods":[],"longitude":-79.8868138,"state":"PA","stars":2,"latitude":40.3505527,"attributes":{"Happy Hour":true,"Accepts Credit Cards":true,"Good For Groups":true,"Outdoor Seating":false,"Price Range":1},"type":"review","votes":{"funny":0,"useful":0,"cool":0},"user_id":"pl78RcFgklDns8atQegwVA","review_id": "kG7wxkBu62X6yxUuZ5IQ6Q","date":"2016-02-24","text":"Possibly the most overhyped establishment in Allegheny County. If you're not a regular, you will be ignored by those who're tending bar. Beer selection is okay, the prices are good and the service is terrible. I would go here, but only if it was someone else's idea."}

Now we have one large flat file instead of two. This will allow for quicker searches as we do not have to join the data together in the middle of the search.

Please follow us on our website at https://volumeintegration.com and on twitter at volumeint