quickstart_writer.adoc 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. === Getting Started
  2. The following section will cover the DataSource Writer aspects this means about how to transfer,
  3. the Spark's Dataset content into Neo4j.
  4. Given the following Scala Program:
  5. [source,scala]
  6. ----
  7. import org.apache.spark.sql.{SaveMode, SparkSession}
  8. import scala.util.Random
  9. val sparkSession = SparkSession.builder().getOrCreate()
  10. import sparkSession.implicits._
  11. case class Point3d(`type`: String = "point-3d",
  12. srid: Int,
  13. x: Double,
  14. y: Double,
  15. z: Double)
  16. case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
  17. val total = 10
  18. val rand = Random
  19. val ds = (1 to total)
  20. .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
  21. Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS()
  22. ds.write
  23. .format("org.neo4j.spark.DataSource")
  24. .mode(SaveMode.ErrorIfExists)
  25. .option("url", "bolt://localhost:7687")
  26. .option("labels", ":Person:Customer")
  27. .save()
  28. ----
  29. Will insert 10 nodes into Neo4j via Spark, and each of these will have:
  30. * 2 `labels`: `Person` and `Customer`
  31. * 4 `properties`: `name`, `surname`, `age` and `livesIn`
  32. ==== Save Mode
  33. In order to persist data into Neo4j the Spark Connector supports two save mode that will
  34. work only if `UNIQUE` or `NODE KEY` constraints are defined into Neo4j for the given properties:
  35. * `SaveMode.ErrorIfExists`: this will build a `CREATE` query
  36. * `SaveMode.Overwrite`: this will build a `MERGE` query
  37. ==== Options
  38. The DataSource Writer has several options in order to connect and persist data into Neo4j.
  39. .Most Common Needed Configuration Settings
  40. |===
  41. |Setting Name |Description |Default Value |Required
  42. |`labels`
  43. |: separated list of the labels to attach to the node.
  44. |_(none)_
  45. |No
  46. |`batch.size`
  47. |The number of the rows sent to Neo4j as batch.
  48. |5000
  49. |No
  50. |`node.keys`
  51. |The comma separated list of properties considered as node keys in case of you're using
  52. `SaveMode.Overwrite`
  53. |_(none)_
  54. |No
  55. |`transaction.codes.fail`
  56. |Comma separated list of Neo4j
  57. |_(none)_
  58. |No
  59. |===
  60. ==== How the Spark Connector persist the data
  61. [NOTE]
  62. As the Neo4j Spark Connector provide batch writes in order to speed-up the ingestion process
  63. so if in the process at some point fails all the previous data is already persisted.
  64. ===== Nodes
  65. In case you use the option `labels` the Spark Connector will persist the entire Dataset as nodes.
  66. Depending on the `SaveMode` it will `CREATE` or `MERGE` nodes (in the last case using the `node.keys`
  67. properties).
  68. The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will
  69. perform the under the hood un `UNWIND` operation over the batch.
  70. I.e. given the following script:
  71. [source,scala]
  72. ----
  73. import org.apache.spark.sql.{SaveMode, SparkSession}
  74. import scala.util.Random
  75. val sparkSession = SparkSession.builder().getOrCreate()
  76. import sparkSession.implicits._
  77. case class Point3d(`type`: String = "point-3d",
  78. srid: Int,
  79. x: Double,
  80. y: Double,
  81. z: Double)
  82. case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
  83. val total = 10
  84. val rand = Random
  85. val ds = (1 to total)
  86. .map(i => Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
  87. Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))).toDS()
  88. ds.write
  89. .format("org.neo4j.spark.DataSource")
  90. .mode(SaveMode.ErrorIfExists)
  91. .option("url", "bolt://localhost:7687")
  92. .option("labels", ":Person:Customer")
  93. .save()
  94. ----
  95. Under the hod the Spark Connector will perform the following Cypher query:
  96. [source,cypher]
  97. ----
  98. UNwIND $events AS event
  99. CREATE (n:`Person`:`Customer`) SET n += event.properties
  100. ----
  101. For the same script as above except for this part
  102. ----
  103. ds.write
  104. .format("org.neo4j.spark.DataSource")
  105. .mode(SaveMode.Overwrite)
  106. .option("url", "bolt://localhost:7687")
  107. .option("labels", ":Person:Customer")
  108. .option("node.keys", "name,surname")
  109. .save()
  110. ----
  111. Under the hod the Spark Connector will perform the following Cypher query:
  112. [source,cypher]
  113. ----
  114. UNwIND $events AS event
  115. MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
  116. SET n += event.properties
  117. ----
  118. In case of the column value is a Map<String, `Value`> (where value can be any supported
  119. https://neo4j.com/docs/cypher-manual/current/syntax/values/[Neo4j Type]) the Connector will automatically
  120. try to flatten it, so if you have the follwing Dataset:
  121. |===
  122. |id |name |lives_in
  123. |1
  124. |Andrea Santurbano
  125. |{address: 'Times Square, 1', city: 'NY', state: 'NY'}
  126. |1
  127. |Davide Fantuzzi
  128. |{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}
  129. |===
  130. Under the hod the Spark Connector will flatten the data in this way:
  131. |===
  132. |id |name |`lives_in.address` |`lives_in.address` |`lives_in.city` |`lives_in.state`
  133. |1
  134. |Andrea Santurbano
  135. |Times Square, 1
  136. |NY
  137. |NY
  138. |1
  139. |Davide Fantuzzi
  140. |Statue of Liberty, 10
  141. |NY
  142. |NY
  143. |===
  144. ===== Query
  145. In case you use the option `query` the Spark Connector will persist the entire Dataset by using the provided query.
  146. The nodes will be sent to Neo4j in a batch of rows defined in the `batch.size` property and we will
  147. perform the under the hood un `UNWIND` operation over the batch.
  148. So given the following simple Spark program:
  149. ----
  150. ds.write
  151. .format("org.neo4j.spark.DataSource")
  152. .option("url", "bolt://localhost:7687")
  153. .option("query", "CREATE (n:Person{fullName: event.name + event.surname})")
  154. .save()
  155. ----
  156. Under the hod the Spark Connector will perform the following Cypher query:
  157. [source,cypher]
  158. ----
  159. UNwIND $events AS event
  160. CREATE (n:Person{fullName: event.name + event.surname})
  161. ----
  162. Where `event` represents each Dataset row.