123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- === Getting Started
- The following section will cover the DataSource Writer aspects this means about how to transfer,
- the Spark's Dataset content into Neo4j.
- Given the following Scala Program:
- [source,scala]
- ----
- import org.apache.spark.sql.{SaveMode, SparkSession}
- import scala.util.Random
- val sparkSession = SparkSession.builder().getOrCreate()
- import sparkSession.implicits._
- case class Point3d(`type`: String = "point-3d",
- srid: Int,
- x: Double,
- y: Double,
- z: Double)
- case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
- val total = 10
- val rand = Random
- val ds = (1 to total)
- .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
- Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS()
- ds.write
- .format("org.neo4j.spark.DataSource")
- .mode(SaveMode.ErrorIfExists)
- .option("url", "bolt://localhost:7687")
- .option("labels", ":Person:Customer")
- .save()
- ----
- Will insert 10 nodes into Neo4j via Spark, and each of these will have:
- * 2 `labels`: `Person` and `Customer`
- * 4 `properties`: `name`, `surname`, `age` and `livesIn`
- ==== Save Mode
- In order to persist data into Neo4j the Spark Connector supports two save mode that will
- work only if `UNIQUE` or `NODE KEY` constraints are defined into Neo4j for the given properties:
- * `SaveMode.ErrorIfExists`: this will build a `CREATE` query
- * `SaveMode.Overwrite`: this will build a `MERGE` query
- ==== Options
- The DataSource Writer has several options in order to connect and persist data into Neo4j.
- .Most Common Needed Configuration Settings
- |===
- |Setting Name |Description |Default Value |Required
- |`labels`
- |: separated list of the labels to attach to the node.
- |_(none)_
- |No
- |`batch.size`
- |The number of the rows sent to Neo4j as batch.
- |5000
- |No
- |`node.keys`
- |The comma separated list of properties considered as node keys in case of you're using
- `SaveMode.Overwrite`
- |_(none)_
- |No
- |`transaction.codes.fail`
- |Comma separated list of Neo4j
- |_(none)_
- |No
- |===
- ==== How the Spark Connector persist the data
- [NOTE]
- As the Neo4j Spark Connector provide batch writes in order to speed-up the ingestion process
- so if in the process at some point fails all the previous data is already persisted.
- ===== Nodes
- In case you use the option `labels` the Spark Connector will persist the entire Dataset as nodes.
- Depending on the `SaveMode` it will `CREATE` or `MERGE` nodes (in the last case using the `node.keys`
- properties).
- The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will
- perform the under the hood un `UNWIND` operation over the batch.
- I.e. given the following script:
- [source,scala]
- ----
- import org.apache.spark.sql.{SaveMode, SparkSession}
- import scala.util.Random
- val sparkSession = SparkSession.builder().getOrCreate()
- import sparkSession.implicits._
- case class Point3d(`type`: String = "point-3d",
- srid: Int,
- x: Double,
- y: Double,
- z: Double)
- case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
- val total = 10
- val rand = Random
- val ds = (1 to total)
- .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
- Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS()
- ds.write
- .format("org.neo4j.spark.DataSource")
- .mode(SaveMode.ErrorIfExists)
- .option("url", "bolt://localhost:7687")
- .option("labels", ":Person:Customer")
- .save()
- ----
- Under the hod the Spark Connector will perform the following Cypher query:
- [source,cypher]
- ----
- UNwIND $events AS event
- CREATE (n:`Person`:`Customer`) SET n += event.properties
- ----
- For the same script as above except for this part
- ----
- ds.write
- .format("org.neo4j.spark.DataSource")
- .mode(SaveMode.Overwrite)
- .option("url", "bolt://localhost:7687")
- .option("labels", ":Person:Customer")
- .option("node.keys", "name,surname")
- .save()
- ----
- Under the hod the Spark Connector will perform the following Cypher query:
- [source,cypher]
- ----
- UNwIND $events AS event
- MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
- SET n += event.properties
- ----
- In case of the column value is a Map<String, `Value`> (where value can be any supported
- https://neo4j.com/docs/cypher-manual/current/syntax/values/[Neo4j Type]) the Connector will automatically
- try to flatten it, so if you have the follwing Dataset:
- |===
- |id |name |lives_in
- |1
- |Andrea Santurbano
- |{address: 'Times Square, 1', city: 'NY', state: 'NY'}
- |1
- |Davide Fantuzzi
- |{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}
- |===
- Under the hod the Spark Connector will flatten the data in this way:
- |===
- |id |name |`lives_in.address` |`lives_in.address` |`lives_in.city` |`lives_in.state`
- |1
- |Andrea Santurbano
- |Times Square, 1
- |NY
- |NY
- |1
- |Davide Fantuzzi
- |Statue of Liberty, 10
- |NY
- |NY
- |===
- ===== Query
- In case you use the option `query` the Spark Connector will persist the entire Dataset by using the provided query.
- The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will
- perform the under the hood un `UNWIND` operation over the batch.
- So given the following simple Spark program:
- ----
- ds.write
- .format("org.neo4j.spark.DataSource")
- .option("url", "bolt://localhost:7687")
- .option("query", "CREATE (n:Person{fullName: event.name + event.surname})")
- .save()
- ----
- Under the hod the Spark Connector will perform the following Cypher query:
- [source,cypher]
- ----
- UNwIND $events AS event
- CREATE (n:Person{fullName: event.name + event.surname})
- ----
- Where `event` represents each Dataset row.
|