=== Getting Started The following section will cover the DataSource Writer aspects this means about how to transfer, the Spark's Dataset content into Neo4j. Given the following Scala Program: [source,scala] ---- import org.apache.spark.sql.{SaveMode, SparkSession} import scala.util.Random val sparkSession = SparkSession.builder().getOrCreate() import sparkSession.implicits._ case class Point3d(`type`: String = "point-3d", srid: Int, x: Double, y: Double, z: Double) case class Person(name: String, surname: String, age: Int, livesIn: Point3d) val total = 10 val rand = Random val ds = (1 to total) .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100), Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS() ds.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.ErrorIfExists) .option("url", "bolt://localhost:7687") .option("labels", ":Person:Customer") .save() ---- Will insert 10 nodes into Neo4j via Spark, and each of these will have: * 2 `labels`: `Person` and `Customer` * 4 `properties`: `name`, `surname`, `age` and `livesIn` ==== Save Mode In order to persist data into Neo4j the Spark Connector supports two save mode that will work only if `UNIQUE` or `NODE KEY` constraints are defined into Neo4j for the given properties: * `SaveMode.ErrorIfExists`: this will build a `CREATE` query * `SaveMode.Overwrite`: this will build a `MERGE` query ==== Options The DataSource Writer has several options in order to connect and persist data into Neo4j. .Most Common Needed Configuration Settings |=== |Setting Name |Description |Default Value |Required |`labels` |: separated list of the labels to attach to the node. |_(none)_ |No |`batch.size` |The number of the rows sent to Neo4j as batch. |5000 |No |`node.keys` |The comma separated list of properties considered as node keys in case of you're using `SaveMode.Overwrite` |_(none)_ |No |`transaction.codes.fail` |Comma separated list of Neo4j |_(none)_ |No |=== ==== How the Spark Connector persist the data [NOTE] As the Neo4j Spark Connector provide batch writes in order to speed-up the ingestion process so if in the process at some point fails all the previous data is already persisted. ===== Nodes In case you use the option `labels` the Spark Connector will persist the entire Dataset as nodes. Depending on the `SaveMode` it will `CREATE` or `MERGE` nodes (in the last case using the `node.keys` properties). The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will perform the under the hood un `UNWIND` operation over the batch. I.e. given the following script: [source,scala] ---- import org.apache.spark.sql.{SaveMode, SparkSession} import scala.util.Random val sparkSession = SparkSession.builder().getOrCreate() import sparkSession.implicits._ case class Point3d(`type`: String = "point-3d", srid: Int, x: Double, y: Double, z: Double) case class Person(name: String, surname: String, age: Int, livesIn: Point3d) val total = 10 val rand = Random val ds = (1 to total) .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100), Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS() ds.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.ErrorIfExists) .option("url", "bolt://localhost:7687") .option("labels", ":Person:Customer") .save() ---- Under the hod the Spark Connector will perform the following Cypher query: [source,cypher] ---- UNwIND $events AS event CREATE (n:`Person`:`Customer`) SET n += event.properties ---- For the same script as above except for this part ---- ds.write .format("org.neo4j.spark.DataSource") .mode(SaveMode.Overwrite) .option("url", "bolt://localhost:7687") .option("labels", ":Person:Customer") .option("node.keys", "name,surname") .save() ---- Under the hod the Spark Connector will perform the following Cypher query: [source,cypher] ---- UNwIND $events AS event MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname}) SET n += event.properties ---- In case of the column value is a Map (where value can be any supported https://neo4j.com/docs/cypher-manual/current/syntax/values/[Neo4j Type]) the Connector will automatically try to flatten it, so if you have the follwing Dataset: |=== |id |name |lives_in |1 |Andrea Santurbano |{address: 'Times Square, 1', city: 'NY', state: 'NY'} |1 |Davide Fantuzzi |{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'} |=== Under the hod the Spark Connector will flatten the data in this way: |=== |id |name |`lives_in.address` |`lives_in.address` |`lives_in.city` |`lives_in.state` |1 |Andrea Santurbano |Times Square, 1 |NY |NY |1 |Davide Fantuzzi |Statue of Liberty, 10 |NY |NY |=== ===== Query In case you use the option `query` the Spark Connector will persist the entire Dataset by using the provided query. The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will perform the under the hood un `UNWIND` operation over the batch. So given the following simple Spark program: ---- ds.write .format("org.neo4j.spark.DataSource") .option("url", "bolt://localhost:7687") .option("query", "CREATE (n:Person{fullName: event.name + event.surname})") .save() ---- Under the hod the Spark Connector will perform the following Cypher query: [source,cypher] ---- UNwIND $events AS event CREATE (n:Person{fullName: event.name + event.surname}) ---- Where `event` represents each Dataset row.